helper_thread: Rework the interval timer code
[fio.git] / helper_thread.c
1 #include <signal.h>
2 #include <unistd.h>
3 #ifdef CONFIG_VALGRIND_DEV
4 #include <valgrind/drd.h>
5 #else
6 #define DRD_IGNORE_VAR(x) do { } while (0)
7 #endif
8
9 #include "fio.h"
10 #include "smalloc.h"
11 #include "helper_thread.h"
12 #include "steadystate.h"
13 #include "pshared.h"
14
15 static int sleep_accuracy_ms;
16
17 enum action {
18         A_EXIT          = 1,
19         A_RESET         = 2,
20         A_DO_STAT       = 3,
21 };
22
23 static struct helper_data {
24         volatile int exit;
25         int pipe[2]; /* 0: read end; 1: write end. */
26         struct sk_out *sk_out;
27         pthread_t thread;
28         struct fio_sem *startup_sem;
29 } *helper_data;
30
31 struct interval_timer {
32         const char      *name;
33         struct timespec expires;
34         uint32_t        interval_ms;
35         int             (*func)(void);
36 };
37
38 void helper_thread_destroy(void)
39 {
40         if (!helper_data)
41                 return;
42
43         close(helper_data->pipe[0]);
44         close(helper_data->pipe[1]);
45         sfree(helper_data);
46 }
47
48 #ifdef _WIN32
49 static void sock_init(void)
50 {
51         WSADATA wsaData;
52         int res;
53
54         /* It is allowed to call WSAStartup() more than once. */
55         res = WSAStartup(MAKEWORD(2, 2), &wsaData);
56         assert(res == 0);
57 }
58
59 static int make_nonblocking(int fd)
60 {
61         unsigned long arg = 1;
62
63         return ioctlsocket(fd, FIONBIO, &arg);
64 }
65
66 static int write_to_pipe(int fd, const void *buf, size_t len)
67 {
68         return send(fd, buf, len, 0);
69 }
70
71 static int read_from_pipe(int fd, void *buf, size_t len)
72 {
73         return recv(fd, buf, len, 0);
74 }
75 #else
76 static void sock_init(void)
77 {
78 }
79
80 static int make_nonblocking(int fd)
81 {
82         return fcntl(fd, F_SETFL, O_NONBLOCK);
83 }
84
85 static int write_to_pipe(int fd, const void *buf, size_t len)
86 {
87         return write(fd, buf, len);
88 }
89
90 static int read_from_pipe(int fd, void *buf, size_t len)
91 {
92         return read(fd, buf, len);
93 }
94 #endif
95
96 static void block_signals(void)
97 {
98 #ifdef HAVE_PTHREAD_SIGMASK
99         sigset_t sigmask;
100
101         ret = pthread_sigmask(SIG_UNBLOCK, NULL, &sigmask);
102         assert(ret == 0);
103         ret = pthread_sigmask(SIG_BLOCK, &sigmask, NULL);
104         assert(ret == 0);
105 #endif
106 }
107
108 static void submit_action(enum action a)
109 {
110         const char data = a;
111         int ret;
112
113         if (!helper_data)
114                 return;
115
116         ret = write_to_pipe(helper_data->pipe[1], &data, sizeof(data));
117         assert(ret == 1);
118 }
119
120 void helper_reset(void)
121 {
122         submit_action(A_RESET);
123 }
124
125 /*
126  * May be invoked in signal handler context and hence must only call functions
127  * that are async-signal-safe. See also
128  * https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html#tag_15_04_03.
129  */
130 void helper_do_stat(void)
131 {
132         submit_action(A_DO_STAT);
133 }
134
135 bool helper_should_exit(void)
136 {
137         if (!helper_data)
138                 return true;
139
140         return helper_data->exit;
141 }
142
143 void helper_thread_exit(void)
144 {
145         if (!helper_data)
146                 return;
147
148         helper_data->exit = 1;
149         submit_action(A_EXIT);
150         pthread_join(helper_data->thread, NULL);
151 }
152
153 /* Resets timers and returns the time in milliseconds until the next event. */
154 static int reset_timers(struct interval_timer timer[], int num_timers,
155                         struct timespec *now)
156 {
157         uint32_t msec_to_next_event = INT_MAX;
158         int i;
159
160         for (i = 0; i < num_timers; ++i) {
161                 timer[i].expires = *now;
162                 timespec_add_msec(&timer[i].expires, timer[i].interval_ms);
163                 msec_to_next_event = min_not_zero(msec_to_next_event,
164                                                   timer[i].interval_ms);
165         }
166
167         return msec_to_next_event;
168 }
169
170 /*
171  * Waits for an action from fd during at least timeout_ms. `fd` must be in
172  * non-blocking mode.
173  */
174 static uint8_t wait_for_action(int fd, unsigned int timeout_ms)
175 {
176         struct timeval timeout = {
177                 .tv_sec  = timeout_ms / 1000,
178                 .tv_usec = (timeout_ms % 1000) * 1000,
179         };
180         fd_set rfds, efds;
181         uint8_t action = 0;
182         int res;
183
184         res = read_from_pipe(fd, &action, sizeof(action));
185         if (res > 0 || timeout_ms == 0)
186                 return action;
187         FD_ZERO(&rfds);
188         FD_SET(fd, &rfds);
189         FD_ZERO(&efds);
190         FD_SET(fd, &efds);
191         res = select(fd + 1, &rfds, NULL, &efds, &timeout);
192         if (res < 0) {
193                 log_err("fio: select() call in helper thread failed: %s",
194                         strerror(errno));
195                 return A_EXIT;
196         }
197         if (FD_ISSET(fd, &rfds))
198                 read_from_pipe(fd, &action, sizeof(action));
199         return action;
200 }
201
202 /*
203  * Verify whether or not timer @it has expired. If timer @it has expired, call
204  * @it->func(). @now is the current time. @msec_to_next_event is an
205  * input/output parameter that represents the time until the next event.
206  */
207 static int eval_timer(struct interval_timer *it, const struct timespec *now,
208                       unsigned int *msec_to_next_event)
209 {
210         int64_t delta_ms;
211         bool expired;
212
213         /* interval == 0 means that the timer is disabled. */
214         if (it->interval_ms == 0)
215                 return 0;
216
217         delta_ms = rel_time_since(now, &it->expires);
218         expired = delta_ms <= sleep_accuracy_ms;
219         if (expired) {
220                 timespec_add_msec(&it->expires, it->interval_ms);
221                 delta_ms = rel_time_since(now, &it->expires);
222                 if (delta_ms < it->interval_ms - sleep_accuracy_ms ||
223                     delta_ms > it->interval_ms + sleep_accuracy_ms) {
224                         dprint(FD_HELPERTHREAD,
225                                "%s: delta = %" PRIi64 " <> %u. Clock jump?\n",
226                                it->name, delta_ms, it->interval_ms);
227                         delta_ms = it->interval_ms;
228                         it->expires = *now;
229                         timespec_add_msec(&it->expires, it->interval_ms);
230                 }
231         }
232         *msec_to_next_event = min((unsigned int)delta_ms, *msec_to_next_event);
233         return expired ? it->func() : 0;
234 }
235
236 static void *helper_thread_main(void *data)
237 {
238         struct helper_data *hd = data;
239         unsigned int msec_to_next_event, next_log;
240         struct interval_timer timer[] = {
241                 {
242                         .name = "disk_util",
243                         .interval_ms = DISK_UTIL_MSEC,
244                         .func = update_io_ticks,
245                 },
246                 {
247                         .name = "status_interval",
248                         .interval_ms = status_interval,
249                         .func = __show_running_run_stats,
250                 },
251                 {
252                         .name = "steadystate",
253                         .interval_ms = steadystate_enabled ? STEADYSTATE_MSEC :
254                                 0,
255                         .func = steadystate_check,
256                 }
257         };
258         struct timespec ts;
259         int clk_tck, ret = 0;
260
261 #ifdef _SC_CLK_TCK
262         clk_tck = sysconf(_SC_CLK_TCK);
263 #else
264         /*
265          * The timer frequence is variable on Windows. Instead of trying to
266          * query it, use 64 Hz, the clock frequency lower bound. See also
267          * https://carpediemsystems.co.uk/2019/07/18/windows-system-timer-granularity/.
268          */
269         clk_tck = 64;
270 #endif
271         dprint(FD_HELPERTHREAD, "clk_tck = %d\n", clk_tck);
272         assert(clk_tck > 0);
273         sleep_accuracy_ms = (1000 + clk_tck - 1) / clk_tck;
274
275         sk_out_assign(hd->sk_out);
276
277         /* Let another thread handle signals. */
278         block_signals();
279
280         fio_get_mono_time(&ts);
281         msec_to_next_event = reset_timers(timer, ARRAY_SIZE(timer), &ts);
282
283         fio_sem_up(hd->startup_sem);
284
285         while (!ret && !hd->exit) {
286                 uint8_t action;
287                 int i;
288
289                 action = wait_for_action(hd->pipe[0], msec_to_next_event);
290                 if (action == A_EXIT)
291                         break;
292
293                 fio_get_mono_time(&ts);
294
295                 msec_to_next_event = INT_MAX;
296
297                 if (action == A_RESET)
298                         msec_to_next_event = reset_timers(timer,
299                                                 ARRAY_SIZE(timer), &ts);
300
301                 for (i = 0; i < ARRAY_SIZE(timer); ++i)
302                         ret = eval_timer(&timer[i], &ts, &msec_to_next_event);
303
304                 if (action == A_DO_STAT)
305                         __show_running_run_stats();
306
307                 next_log = calc_log_samples();
308                 if (!next_log)
309                         next_log = DISK_UTIL_MSEC;
310
311                 msec_to_next_event = min(next_log, msec_to_next_event);
312                 dprint(FD_HELPERTHREAD,
313                        "next_log: %u, msec_to_next_event: %u\n",
314                        next_log, msec_to_next_event);
315
316                 if (!is_backend)
317                         print_thread_status();
318         }
319
320         fio_writeout_logs(false);
321
322         sk_out_drop();
323         return NULL;
324 }
325
326 /*
327  * Connect two sockets to each other to emulate the pipe() system call on Windows.
328  */
329 int pipe_over_loopback(int fd[2])
330 {
331         struct sockaddr_in addr = { .sin_family = AF_INET };
332         socklen_t len = sizeof(addr);
333         int res;
334
335         addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
336
337         sock_init();
338
339         fd[0] = socket(AF_INET, SOCK_STREAM, 0);
340         if (fd[0] < 0)
341                 goto err;
342         fd[1] = socket(AF_INET, SOCK_STREAM, 0);
343         if (fd[1] < 0)
344                 goto close_fd_0;
345         res = bind(fd[0], (struct sockaddr *)&addr, len);
346         if (res < 0)
347                 goto close_fd_1;
348         res = getsockname(fd[0], (struct sockaddr *)&addr, &len);
349         if (res < 0)
350                 goto close_fd_1;
351         res = listen(fd[0], 1);
352         if (res < 0)
353                 goto close_fd_1;
354         res = connect(fd[1], (struct sockaddr *)&addr, len);
355         if (res < 0)
356                 goto close_fd_1;
357         res = accept(fd[0], NULL, NULL);
358         if (res < 0)
359                 goto close_fd_1;
360         close(fd[0]);
361         fd[0] = res;
362         return 0;
363
364 close_fd_1:
365         close(fd[1]);
366
367 close_fd_0:
368         close(fd[0]);
369
370 err:
371         return -1;
372 }
373
374 int helper_thread_create(struct fio_sem *startup_sem, struct sk_out *sk_out)
375 {
376         struct helper_data *hd;
377         int ret;
378
379         hd = scalloc(1, sizeof(*hd));
380
381         setup_disk_util();
382         steadystate_setup();
383
384         hd->sk_out = sk_out;
385
386 #if defined(CONFIG_PIPE2)
387         ret = pipe2(hd->pipe, O_CLOEXEC);
388 #elif defined(CONFIG_PIPE)
389         ret = pipe(hd->pipe);
390 #else
391         ret = pipe_over_loopback(hd->pipe);
392 #endif
393         if (ret)
394                 return 1;
395
396         ret = make_nonblocking(hd->pipe[0]);
397         assert(ret >= 0);
398
399         hd->startup_sem = startup_sem;
400
401         DRD_IGNORE_VAR(helper_data);
402
403         ret = pthread_create(&hd->thread, NULL, helper_thread_main, hd);
404         if (ret) {
405                 log_err("Can't create helper thread: %s\n", strerror(ret));
406                 return 1;
407         }
408
409         helper_data = hd;
410
411         dprint(FD_MUTEX, "wait on startup_sem\n");
412         fio_sem_down(startup_sem);
413         dprint(FD_MUTEX, "done waiting on startup_sem\n");
414         return 0;
415 }