Merge branch 'master' of https://github.com/celestinechen/fio
[fio.git] / helper_thread.c
CommitLineData
83276370 1#include <errno.h>
c31092b8 2#include <signal.h>
83276370
DP
3#include <stdio.h>
4#include <string.h>
df9bd7d4 5#include <unistd.h>
696378af
BVA
6#ifdef CONFIG_HAVE_TIMERFD_CREATE
7#include <sys/timerfd.h>
8#endif
4f37732a
BVA
9#ifdef CONFIG_VALGRIND_DEV
10#include <valgrind/drd.h>
11#else
12#define DRD_IGNORE_VAR(x) do { } while (0)
13#endif
14
3d0d549a
BP
15#ifdef WIN32
16#include "os/os-windows.h"
17#endif
18
a39fb9ea
JA
19#include "fio.h"
20#include "smalloc.h"
21#include "helper_thread.h"
16e56d25 22#include "steadystate.h"
ae626d4e 23#include "pshared.h"
a39fb9ea 24
df9bd7d4 25static int sleep_accuracy_ms;
696378af 26static int timerfd = -1;
df9bd7d4 27
52a552e2
BVA
28enum action {
29 A_EXIT = 1,
30 A_RESET = 2,
31 A_DO_STAT = 3,
32};
33
a39fb9ea
JA
34static struct helper_data {
35 volatile int exit;
52a552e2 36 int pipe[2]; /* 0: read end; 1: write end. */
a39fb9ea
JA
37 struct sk_out *sk_out;
38 pthread_t thread;
971caeb1 39 struct fio_sem *startup_sem;
a39fb9ea
JA
40} *helper_data;
41
df9bd7d4
BVA
42struct interval_timer {
43 const char *name;
44 struct timespec expires;
45 uint32_t interval_ms;
46 int (*func)(void);
47};
48
a39fb9ea
JA
49void helper_thread_destroy(void)
50{
998e9ebb
BVA
51 if (!helper_data)
52 return;
53
52a552e2
BVA
54 close(helper_data->pipe[0]);
55 close(helper_data->pipe[1]);
a39fb9ea
JA
56 sfree(helper_data);
57}
58
52a552e2
BVA
59#ifdef _WIN32
60static void sock_init(void)
a39fb9ea 61{
52a552e2
BVA
62 WSADATA wsaData;
63 int res;
a39fb9ea 64
52a552e2
BVA
65 /* It is allowed to call WSAStartup() more than once. */
66 res = WSAStartup(MAKEWORD(2, 2), &wsaData);
67 assert(res == 0);
68}
a39fb9ea 69
52a552e2
BVA
70static int make_nonblocking(int fd)
71{
72 unsigned long arg = 1;
dda11987 73
52a552e2 74 return ioctlsocket(fd, FIONBIO, &arg);
a39fb9ea
JA
75}
76
52a552e2
BVA
77static int write_to_pipe(int fd, const void *buf, size_t len)
78{
79 return send(fd, buf, len, 0);
80}
81
82static int read_from_pipe(int fd, void *buf, size_t len)
83{
84 return recv(fd, buf, len, 0);
85}
86#else
87static void sock_init(void)
88{
89}
90
91static int make_nonblocking(int fd)
92{
93 return fcntl(fd, F_SETFL, O_NONBLOCK);
94}
95
96static int write_to_pipe(int fd, const void *buf, size_t len)
97{
98 return write(fd, buf, len);
99}
100
101static int read_from_pipe(int fd, void *buf, size_t len)
102{
103 return read(fd, buf, len);
104}
105#endif
106
2575407f
BVA
107static void block_signals(void)
108{
86474259 109#ifdef CONFIG_PTHREAD_SIGMASK
2575407f
BVA
110 sigset_t sigmask;
111
86474259
SL
112 int ret;
113
2575407f
BVA
114 ret = pthread_sigmask(SIG_UNBLOCK, NULL, &sigmask);
115 assert(ret == 0);
116 ret = pthread_sigmask(SIG_BLOCK, &sigmask, NULL);
2575407f
BVA
117#endif
118}
119
52a552e2 120static void submit_action(enum action a)
a39fb9ea 121{
52a552e2
BVA
122 const char data = a;
123 int ret;
124
dda11987
JA
125 if (!helper_data)
126 return;
127
52a552e2 128 ret = write_to_pipe(helper_data->pipe[1], &data, sizeof(data));
83276370
DP
129 if (ret != 1) {
130 log_err("failed to write action into pipe, err %i:%s", errno, strerror(errno));
131 assert(0);
132 }
52a552e2
BVA
133}
134
135void helper_reset(void)
136{
137 submit_action(A_RESET);
138}
139
140/*
141 * May be invoked in signal handler context and hence must only call functions
142 * that are async-signal-safe. See also
143 * https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html#tag_15_04_03.
144 */
145void helper_do_stat(void)
146{
147 submit_action(A_DO_STAT);
a39fb9ea
JA
148}
149
150bool helper_should_exit(void)
151{
152 if (!helper_data)
153 return true;
154
155 return helper_data->exit;
156}
157
158void helper_thread_exit(void)
159{
998e9ebb
BVA
160 if (!helper_data)
161 return;
162
a39fb9ea 163 helper_data->exit = 1;
52a552e2 164 pthread_join(helper_data->thread, NULL);
a39fb9ea
JA
165}
166
df9bd7d4
BVA
167/* Resets timers and returns the time in milliseconds until the next event. */
168static int reset_timers(struct interval_timer timer[], int num_timers,
169 struct timespec *now)
170{
171 uint32_t msec_to_next_event = INT_MAX;
172 int i;
173
174 for (i = 0; i < num_timers; ++i) {
175 timer[i].expires = *now;
176 timespec_add_msec(&timer[i].expires, timer[i].interval_ms);
177 msec_to_next_event = min_not_zero(msec_to_next_event,
178 timer[i].interval_ms);
179 }
180
181 return msec_to_next_event;
182}
183
335210df
BVA
184/*
185 * Waits for an action from fd during at least timeout_ms. `fd` must be in
186 * non-blocking mode.
187 */
188static uint8_t wait_for_action(int fd, unsigned int timeout_ms)
189{
190 struct timeval timeout = {
191 .tv_sec = timeout_ms / 1000,
192 .tv_usec = (timeout_ms % 1000) * 1000,
193 };
194 fd_set rfds, efds;
195 uint8_t action = 0;
696378af 196 uint64_t exp;
335210df
BVA
197 int res;
198
199 res = read_from_pipe(fd, &action, sizeof(action));
200 if (res > 0 || timeout_ms == 0)
201 return action;
202 FD_ZERO(&rfds);
203 FD_SET(fd, &rfds);
204 FD_ZERO(&efds);
205 FD_SET(fd, &efds);
696378af
BVA
206#ifdef CONFIG_HAVE_TIMERFD_CREATE
207 {
208 /*
209 * If the timer frequency is 100 Hz, select() will round up
210 * `timeout` to the next multiple of 1 / 100 Hz = 10 ms. Hence
211 * use a high-resolution timer if possible to increase
212 * select() timeout accuracy.
213 */
214 struct itimerspec delta = {};
215
216 delta.it_value.tv_sec = timeout.tv_sec;
217 delta.it_value.tv_nsec = timeout.tv_usec * 1000;
218 res = timerfd_settime(timerfd, 0, &delta, NULL);
219 assert(res == 0);
220 FD_SET(timerfd, &rfds);
221 }
222#endif
223 res = select(max(fd, timerfd) + 1, &rfds, NULL, &efds,
224 timerfd >= 0 ? NULL : &timeout);
335210df
BVA
225 if (res < 0) {
226 log_err("fio: select() call in helper thread failed: %s",
227 strerror(errno));
228 return A_EXIT;
229 }
230 if (FD_ISSET(fd, &rfds))
231 read_from_pipe(fd, &action, sizeof(action));
696378af
BVA
232 if (timerfd >= 0 && FD_ISSET(timerfd, &rfds)) {
233 res = read(timerfd, &exp, sizeof(exp));
234 assert(res == sizeof(exp));
235 }
335210df
BVA
236 return action;
237}
238
df9bd7d4
BVA
239/*
240 * Verify whether or not timer @it has expired. If timer @it has expired, call
241 * @it->func(). @now is the current time. @msec_to_next_event is an
242 * input/output parameter that represents the time until the next event.
243 */
244static int eval_timer(struct interval_timer *it, const struct timespec *now,
245 unsigned int *msec_to_next_event)
3da71e37 246{
df9bd7d4
BVA
247 int64_t delta_ms;
248 bool expired;
249
250 /* interval == 0 means that the timer is disabled. */
251 if (it->interval_ms == 0)
252 return 0;
253
254 delta_ms = rel_time_since(now, &it->expires);
255 expired = delta_ms <= sleep_accuracy_ms;
256 if (expired) {
257 timespec_add_msec(&it->expires, it->interval_ms);
258 delta_ms = rel_time_since(now, &it->expires);
259 if (delta_ms < it->interval_ms - sleep_accuracy_ms ||
260 delta_ms > it->interval_ms + sleep_accuracy_ms) {
261 dprint(FD_HELPERTHREAD,
262 "%s: delta = %" PRIi64 " <> %u. Clock jump?\n",
263 it->name, delta_ms, it->interval_ms);
264 delta_ms = it->interval_ms;
265 it->expires = *now;
266 timespec_add_msec(&it->expires, it->interval_ms);
267 }
268 }
269 *msec_to_next_event = min((unsigned int)delta_ms, *msec_to_next_event);
270 return expired ? it->func() : 0;
3da71e37
VF
271}
272
a39fb9ea
JA
273static void *helper_thread_main(void *data)
274{
275 struct helper_data *hd = data;
df9bd7d4
BVA
276 unsigned int msec_to_next_event, next_log;
277 struct interval_timer timer[] = {
278 {
279 .name = "disk_util",
280 .interval_ms = DISK_UTIL_MSEC,
281 .func = update_io_ticks,
282 },
283 {
284 .name = "status_interval",
285 .interval_ms = status_interval,
286 .func = __show_running_run_stats,
287 },
288 {
289 .name = "steadystate",
90e678ba 290 .interval_ms = steadystate_enabled ? ss_check_interval :
df9bd7d4
BVA
291 0,
292 .func = steadystate_check,
293 }
294 };
295 struct timespec ts;
3d0d549a
BP
296 long clk_tck;
297 int ret = 0;
df9bd7d4 298
3d0d549a
BP
299 os_clk_tck(&clk_tck);
300
301 dprint(FD_HELPERTHREAD, "clk_tck = %ld\n", clk_tck);
df9bd7d4
BVA
302 assert(clk_tck > 0);
303 sleep_accuracy_ms = (1000 + clk_tck - 1) / clk_tck;
a39fb9ea 304
696378af
BVA
305#ifdef CONFIG_HAVE_TIMERFD_CREATE
306 timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
307 assert(timerfd >= 0);
308 sleep_accuracy_ms = 1;
309#endif
310
a39fb9ea
JA
311 sk_out_assign(hd->sk_out);
312
c31092b8 313 /* Let another thread handle signals. */
2575407f 314 block_signals();
c31092b8 315
69212fc4 316 fio_get_mono_time(&ts);
59f94d26 317 msec_to_next_event = reset_timers(timer, FIO_ARRAY_SIZE(timer), &ts);
a39fb9ea 318
971caeb1 319 fio_sem_up(hd->startup_sem);
a39fb9ea 320
a39fb9ea 321 while (!ret && !hd->exit) {
335210df 322 uint8_t action;
df9bd7d4 323 int i;
335210df
BVA
324
325 action = wait_for_action(hd->pipe[0], msec_to_next_event);
326 if (action == A_EXIT)
327 break;
a39fb9ea 328
69212fc4 329 fio_get_mono_time(&ts);
a39fb9ea 330
df9bd7d4
BVA
331 msec_to_next_event = INT_MAX;
332
333 if (action == A_RESET)
334 msec_to_next_event = reset_timers(timer,
59f94d26 335 FIO_ARRAY_SIZE(timer), &ts);
a39fb9ea 336
59f94d26 337 for (i = 0; i < FIO_ARRAY_SIZE(timer); ++i)
df9bd7d4 338 ret = eval_timer(&timer[i], &ts, &msec_to_next_event);
a39fb9ea 339
52a552e2 340 if (action == A_DO_STAT)
a39fb9ea 341 __show_running_run_stats();
a39fb9ea
JA
342
343 next_log = calc_log_samples();
344 if (!next_log)
345 next_log = DISK_UTIL_MSEC;
346
3da71e37 347 msec_to_next_event = min(next_log, msec_to_next_event);
df9bd7d4
BVA
348 dprint(FD_HELPERTHREAD,
349 "next_log: %u, msec_to_next_event: %u\n",
350 next_log, msec_to_next_event);
a39fb9ea
JA
351
352 if (!is_backend)
353 print_thread_status();
354 }
355
696378af
BVA
356 if (timerfd >= 0) {
357 close(timerfd);
358 timerfd = -1;
359 }
360
a39fb9ea
JA
361 fio_writeout_logs(false);
362
363 sk_out_drop();
364 return NULL;
365}
366
52a552e2
BVA
367/*
368 * Connect two sockets to each other to emulate the pipe() system call on Windows.
369 */
370int pipe_over_loopback(int fd[2])
371{
372 struct sockaddr_in addr = { .sin_family = AF_INET };
373 socklen_t len = sizeof(addr);
374 int res;
375
376 addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
377
378 sock_init();
379
380 fd[0] = socket(AF_INET, SOCK_STREAM, 0);
381 if (fd[0] < 0)
382 goto err;
383 fd[1] = socket(AF_INET, SOCK_STREAM, 0);
384 if (fd[1] < 0)
385 goto close_fd_0;
386 res = bind(fd[0], (struct sockaddr *)&addr, len);
387 if (res < 0)
388 goto close_fd_1;
389 res = getsockname(fd[0], (struct sockaddr *)&addr, &len);
390 if (res < 0)
391 goto close_fd_1;
392 res = listen(fd[0], 1);
393 if (res < 0)
394 goto close_fd_1;
395 res = connect(fd[1], (struct sockaddr *)&addr, len);
396 if (res < 0)
397 goto close_fd_1;
398 res = accept(fd[0], NULL, NULL);
399 if (res < 0)
400 goto close_fd_1;
401 close(fd[0]);
402 fd[0] = res;
403 return 0;
404
405close_fd_1:
406 close(fd[1]);
407
408close_fd_0:
409 close(fd[0]);
410
411err:
412 return -1;
413}
414
971caeb1 415int helper_thread_create(struct fio_sem *startup_sem, struct sk_out *sk_out)
a39fb9ea
JA
416{
417 struct helper_data *hd;
418 int ret;
419
b3090ff4 420 hd = scalloc(1, sizeof(*hd));
a39fb9ea
JA
421
422 setup_disk_util();
16e56d25 423 steadystate_setup();
a39fb9ea
JA
424
425 hd->sk_out = sk_out;
34febb23 426
52a552e2
BVA
427#if defined(CONFIG_PIPE2)
428 ret = pipe2(hd->pipe, O_CLOEXEC);
429#elif defined(CONFIG_PIPE)
430 ret = pipe(hd->pipe);
431#else
432 ret = pipe_over_loopback(hd->pipe);
433#endif
34febb23 434 if (ret)
f9e5b5ee 435 return 1;
34febb23 436
52a552e2
BVA
437 ret = make_nonblocking(hd->pipe[0]);
438 assert(ret >= 0);
439
971caeb1 440 hd->startup_sem = startup_sem;
a39fb9ea 441
4f37732a
BVA
442 DRD_IGNORE_VAR(helper_data);
443
a39fb9ea
JA
444 ret = pthread_create(&hd->thread, NULL, helper_thread_main, hd);
445 if (ret) {
446 log_err("Can't create helper thread: %s\n", strerror(ret));
447 return 1;
448 }
449
450 helper_data = hd;
451
971caeb1
BVA
452 dprint(FD_MUTEX, "wait on startup_sem\n");
453 fio_sem_down(startup_sem);
454 dprint(FD_MUTEX, "done waiting on startup_sem\n");
a39fb9ea
JA
455 return 0;
456}