t/io_uring: link with libaio when necessary
[fio.git] / helper_thread.c
1 #include <signal.h>
2 #include <unistd.h>
3 #ifdef CONFIG_HAVE_TIMERFD_CREATE
4 #include <sys/timerfd.h>
5 #endif
6 #ifdef CONFIG_VALGRIND_DEV
7 #include <valgrind/drd.h>
8 #else
9 #define DRD_IGNORE_VAR(x) do { } while (0)
10 #endif
11
12 #ifdef WIN32
13 #include "os/os-windows.h"
14 #endif
15
16 #include "fio.h"
17 #include "smalloc.h"
18 #include "helper_thread.h"
19 #include "steadystate.h"
20 #include "pshared.h"
21
22 static int sleep_accuracy_ms;
23 static int timerfd = -1;
24
25 enum action {
26         A_EXIT          = 1,
27         A_RESET         = 2,
28         A_DO_STAT       = 3,
29 };
30
31 static struct helper_data {
32         volatile int exit;
33         int pipe[2]; /* 0: read end; 1: write end. */
34         struct sk_out *sk_out;
35         pthread_t thread;
36         struct fio_sem *startup_sem;
37 } *helper_data;
38
39 struct interval_timer {
40         const char      *name;
41         struct timespec expires;
42         uint32_t        interval_ms;
43         int             (*func)(void);
44 };
45
46 void helper_thread_destroy(void)
47 {
48         if (!helper_data)
49                 return;
50
51         close(helper_data->pipe[0]);
52         close(helper_data->pipe[1]);
53         sfree(helper_data);
54 }
55
56 #ifdef _WIN32
57 static void sock_init(void)
58 {
59         WSADATA wsaData;
60         int res;
61
62         /* It is allowed to call WSAStartup() more than once. */
63         res = WSAStartup(MAKEWORD(2, 2), &wsaData);
64         assert(res == 0);
65 }
66
67 static int make_nonblocking(int fd)
68 {
69         unsigned long arg = 1;
70
71         return ioctlsocket(fd, FIONBIO, &arg);
72 }
73
74 static int write_to_pipe(int fd, const void *buf, size_t len)
75 {
76         return send(fd, buf, len, 0);
77 }
78
79 static int read_from_pipe(int fd, void *buf, size_t len)
80 {
81         return recv(fd, buf, len, 0);
82 }
83 #else
84 static void sock_init(void)
85 {
86 }
87
88 static int make_nonblocking(int fd)
89 {
90         return fcntl(fd, F_SETFL, O_NONBLOCK);
91 }
92
93 static int write_to_pipe(int fd, const void *buf, size_t len)
94 {
95         return write(fd, buf, len);
96 }
97
98 static int read_from_pipe(int fd, void *buf, size_t len)
99 {
100         return read(fd, buf, len);
101 }
102 #endif
103
104 static void block_signals(void)
105 {
106 #ifdef HAVE_PTHREAD_SIGMASK
107         sigset_t sigmask;
108
109         ret = pthread_sigmask(SIG_UNBLOCK, NULL, &sigmask);
110         assert(ret == 0);
111         ret = pthread_sigmask(SIG_BLOCK, &sigmask, NULL);
112         assert(ret == 0);
113 #endif
114 }
115
116 static void submit_action(enum action a)
117 {
118         const char data = a;
119         int ret;
120
121         if (!helper_data)
122                 return;
123
124         ret = write_to_pipe(helper_data->pipe[1], &data, sizeof(data));
125         assert(ret == 1);
126 }
127
128 void helper_reset(void)
129 {
130         submit_action(A_RESET);
131 }
132
133 /*
134  * May be invoked in signal handler context and hence must only call functions
135  * that are async-signal-safe. See also
136  * https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html#tag_15_04_03.
137  */
138 void helper_do_stat(void)
139 {
140         submit_action(A_DO_STAT);
141 }
142
143 bool helper_should_exit(void)
144 {
145         if (!helper_data)
146                 return true;
147
148         return helper_data->exit;
149 }
150
151 void helper_thread_exit(void)
152 {
153         if (!helper_data)
154                 return;
155
156         helper_data->exit = 1;
157         submit_action(A_EXIT);
158         pthread_join(helper_data->thread, NULL);
159 }
160
161 /* Resets timers and returns the time in milliseconds until the next event. */
162 static int reset_timers(struct interval_timer timer[], int num_timers,
163                         struct timespec *now)
164 {
165         uint32_t msec_to_next_event = INT_MAX;
166         int i;
167
168         for (i = 0; i < num_timers; ++i) {
169                 timer[i].expires = *now;
170                 timespec_add_msec(&timer[i].expires, timer[i].interval_ms);
171                 msec_to_next_event = min_not_zero(msec_to_next_event,
172                                                   timer[i].interval_ms);
173         }
174
175         return msec_to_next_event;
176 }
177
178 /*
179  * Waits for an action from fd during at least timeout_ms. `fd` must be in
180  * non-blocking mode.
181  */
182 static uint8_t wait_for_action(int fd, unsigned int timeout_ms)
183 {
184         struct timeval timeout = {
185                 .tv_sec  = timeout_ms / 1000,
186                 .tv_usec = (timeout_ms % 1000) * 1000,
187         };
188         fd_set rfds, efds;
189         uint8_t action = 0;
190         uint64_t exp;
191         int res;
192
193         res = read_from_pipe(fd, &action, sizeof(action));
194         if (res > 0 || timeout_ms == 0)
195                 return action;
196         FD_ZERO(&rfds);
197         FD_SET(fd, &rfds);
198         FD_ZERO(&efds);
199         FD_SET(fd, &efds);
200 #ifdef CONFIG_HAVE_TIMERFD_CREATE
201         {
202                 /*
203                  * If the timer frequency is 100 Hz, select() will round up
204                  * `timeout` to the next multiple of 1 / 100 Hz = 10 ms. Hence
205                  * use a high-resolution timer if possible to increase
206                  * select() timeout accuracy.
207                  */
208                 struct itimerspec delta = {};
209
210                 delta.it_value.tv_sec = timeout.tv_sec;
211                 delta.it_value.tv_nsec = timeout.tv_usec * 1000;
212                 res = timerfd_settime(timerfd, 0, &delta, NULL);
213                 assert(res == 0);
214                 FD_SET(timerfd, &rfds);
215         }
216 #endif
217         res = select(max(fd, timerfd) + 1, &rfds, NULL, &efds,
218                      timerfd >= 0 ? NULL : &timeout);
219         if (res < 0) {
220                 log_err("fio: select() call in helper thread failed: %s",
221                         strerror(errno));
222                 return A_EXIT;
223         }
224         if (FD_ISSET(fd, &rfds))
225                 read_from_pipe(fd, &action, sizeof(action));
226         if (timerfd >= 0 && FD_ISSET(timerfd, &rfds)) {
227                 res = read(timerfd, &exp, sizeof(exp));
228                 assert(res == sizeof(exp));
229         }
230         return action;
231 }
232
233 /*
234  * Verify whether or not timer @it has expired. If timer @it has expired, call
235  * @it->func(). @now is the current time. @msec_to_next_event is an
236  * input/output parameter that represents the time until the next event.
237  */
238 static int eval_timer(struct interval_timer *it, const struct timespec *now,
239                       unsigned int *msec_to_next_event)
240 {
241         int64_t delta_ms;
242         bool expired;
243
244         /* interval == 0 means that the timer is disabled. */
245         if (it->interval_ms == 0)
246                 return 0;
247
248         delta_ms = rel_time_since(now, &it->expires);
249         expired = delta_ms <= sleep_accuracy_ms;
250         if (expired) {
251                 timespec_add_msec(&it->expires, it->interval_ms);
252                 delta_ms = rel_time_since(now, &it->expires);
253                 if (delta_ms < it->interval_ms - sleep_accuracy_ms ||
254                     delta_ms > it->interval_ms + sleep_accuracy_ms) {
255                         dprint(FD_HELPERTHREAD,
256                                "%s: delta = %" PRIi64 " <> %u. Clock jump?\n",
257                                it->name, delta_ms, it->interval_ms);
258                         delta_ms = it->interval_ms;
259                         it->expires = *now;
260                         timespec_add_msec(&it->expires, it->interval_ms);
261                 }
262         }
263         *msec_to_next_event = min((unsigned int)delta_ms, *msec_to_next_event);
264         return expired ? it->func() : 0;
265 }
266
267 static void *helper_thread_main(void *data)
268 {
269         struct helper_data *hd = data;
270         unsigned int msec_to_next_event, next_log;
271         struct interval_timer timer[] = {
272                 {
273                         .name = "disk_util",
274                         .interval_ms = DISK_UTIL_MSEC,
275                         .func = update_io_ticks,
276                 },
277                 {
278                         .name = "status_interval",
279                         .interval_ms = status_interval,
280                         .func = __show_running_run_stats,
281                 },
282                 {
283                         .name = "steadystate",
284                         .interval_ms = steadystate_enabled ? STEADYSTATE_MSEC :
285                                 0,
286                         .func = steadystate_check,
287                 }
288         };
289         struct timespec ts;
290         long clk_tck;
291         int ret = 0;
292
293         os_clk_tck(&clk_tck);
294
295         dprint(FD_HELPERTHREAD, "clk_tck = %ld\n", clk_tck);
296         assert(clk_tck > 0);
297         sleep_accuracy_ms = (1000 + clk_tck - 1) / clk_tck;
298
299 #ifdef CONFIG_HAVE_TIMERFD_CREATE
300         timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
301         assert(timerfd >= 0);
302         sleep_accuracy_ms = 1;
303 #endif
304
305         sk_out_assign(hd->sk_out);
306
307         /* Let another thread handle signals. */
308         block_signals();
309
310         fio_get_mono_time(&ts);
311         msec_to_next_event = reset_timers(timer, FIO_ARRAY_SIZE(timer), &ts);
312
313         fio_sem_up(hd->startup_sem);
314
315         while (!ret && !hd->exit) {
316                 uint8_t action;
317                 int i;
318
319                 action = wait_for_action(hd->pipe[0], msec_to_next_event);
320                 if (action == A_EXIT)
321                         break;
322
323                 fio_get_mono_time(&ts);
324
325                 msec_to_next_event = INT_MAX;
326
327                 if (action == A_RESET)
328                         msec_to_next_event = reset_timers(timer,
329                                                 FIO_ARRAY_SIZE(timer), &ts);
330
331                 for (i = 0; i < FIO_ARRAY_SIZE(timer); ++i)
332                         ret = eval_timer(&timer[i], &ts, &msec_to_next_event);
333
334                 if (action == A_DO_STAT)
335                         __show_running_run_stats();
336
337                 next_log = calc_log_samples();
338                 if (!next_log)
339                         next_log = DISK_UTIL_MSEC;
340
341                 msec_to_next_event = min(next_log, msec_to_next_event);
342                 dprint(FD_HELPERTHREAD,
343                        "next_log: %u, msec_to_next_event: %u\n",
344                        next_log, msec_to_next_event);
345
346                 if (!is_backend)
347                         print_thread_status();
348         }
349
350         if (timerfd >= 0) {
351                 close(timerfd);
352                 timerfd = -1;
353         }
354
355         fio_writeout_logs(false);
356
357         sk_out_drop();
358         return NULL;
359 }
360
361 /*
362  * Connect two sockets to each other to emulate the pipe() system call on Windows.
363  */
364 int pipe_over_loopback(int fd[2])
365 {
366         struct sockaddr_in addr = { .sin_family = AF_INET };
367         socklen_t len = sizeof(addr);
368         int res;
369
370         addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
371
372         sock_init();
373
374         fd[0] = socket(AF_INET, SOCK_STREAM, 0);
375         if (fd[0] < 0)
376                 goto err;
377         fd[1] = socket(AF_INET, SOCK_STREAM, 0);
378         if (fd[1] < 0)
379                 goto close_fd_0;
380         res = bind(fd[0], (struct sockaddr *)&addr, len);
381         if (res < 0)
382                 goto close_fd_1;
383         res = getsockname(fd[0], (struct sockaddr *)&addr, &len);
384         if (res < 0)
385                 goto close_fd_1;
386         res = listen(fd[0], 1);
387         if (res < 0)
388                 goto close_fd_1;
389         res = connect(fd[1], (struct sockaddr *)&addr, len);
390         if (res < 0)
391                 goto close_fd_1;
392         res = accept(fd[0], NULL, NULL);
393         if (res < 0)
394                 goto close_fd_1;
395         close(fd[0]);
396         fd[0] = res;
397         return 0;
398
399 close_fd_1:
400         close(fd[1]);
401
402 close_fd_0:
403         close(fd[0]);
404
405 err:
406         return -1;
407 }
408
409 int helper_thread_create(struct fio_sem *startup_sem, struct sk_out *sk_out)
410 {
411         struct helper_data *hd;
412         int ret;
413
414         hd = scalloc(1, sizeof(*hd));
415
416         setup_disk_util();
417         steadystate_setup();
418
419         hd->sk_out = sk_out;
420
421 #if defined(CONFIG_PIPE2)
422         ret = pipe2(hd->pipe, O_CLOEXEC);
423 #elif defined(CONFIG_PIPE)
424         ret = pipe(hd->pipe);
425 #else
426         ret = pipe_over_loopback(hd->pipe);
427 #endif
428         if (ret)
429                 return 1;
430
431         ret = make_nonblocking(hd->pipe[0]);
432         assert(ret >= 0);
433
434         hd->startup_sem = startup_sem;
435
436         DRD_IGNORE_VAR(helper_data);
437
438         ret = pthread_create(&hd->thread, NULL, helper_thread_main, hd);
439         if (ret) {
440                 log_err("Can't create helper thread: %s\n", strerror(ret));
441                 return 1;
442         }
443
444         helper_data = hd;
445
446         dprint(FD_MUTEX, "wait on startup_sem\n");
447         fio_sem_down(startup_sem);
448         dprint(FD_MUTEX, "done waiting on startup_sem\n");
449         return 0;
450 }