First cut at doing sequences of atoms in one commit
[fio.git] / engines / syslet-rw.c
1 /*
2  * read/write() engine that uses syslet to be async
3  *
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <unistd.h>
8 #include <errno.h>
9 #include <assert.h>
10
11 #include "../fio.h"
12 #include "../os.h"
13
14 #ifdef FIO_HAVE_SYSLET
15
16 struct syslet_data {
17         struct io_u **events;
18         unsigned int nr_events;
19         
20         struct async_head_user ahu;
21         struct syslet_uatom **ring;
22
23         struct syslet_uatom *head, *tail;
24         struct syslet_uatom **event_map;
25         unsigned int event_map_idx;
26 };
27
28 static void fio_syslet_complete_atom(struct thread_data *td,
29                                      struct syslet_uatom *atom)
30 {
31         struct syslet_data *sd = td->io_ops->data;
32         struct io_u *io_u;
33         int i, end;
34
35         if (!sd->event_map_idx)
36                 return;
37
38         /*
39          * Find the start of the string of atoms for this sequence
40          */
41         for (end = sd->event_map_idx - 1; end >= 0; end--)
42                 if (atom == sd->event_map[end])
43                         break;
44
45         if (end < 0 || atom != sd->event_map[end]) {
46                 printf("didn't find atom\n");
47                 return;
48         }
49
50         //printf("end=%d, total %d\n", end, sd->event_map_idx);
51
52         /*
53          * now complete in right order
54          */
55         for (i = 0; i <= end; i++) {
56                 long ret;
57
58                 atom = sd->event_map[i];
59                 io_u = atom->private;
60                 ret = *atom->ret_ptr;
61                 if (ret > 0)
62                         io_u->resid = io_u->xfer_buflen - ret;
63                 else if (ret < 0)
64                         io_u->error = ret;
65
66                 assert(sd->nr_events < td->iodepth);
67                 sd->events[sd->nr_events++] = io_u;
68         }
69
70         /*
71          * Move later completions to the front, if we didn't complete all
72          */
73         if (end == (int) sd->event_map_idx - 1)
74                 sd->event_map_idx = 0;
75         else {
76                 int nr = sd->event_map_idx - end - 1;
77
78                 memmove(sd->event_map, &sd->event_map[end + 1], nr * sizeof(struct syslet_uatom *));
79                 sd->event_map_idx = nr;
80         }
81 }
82
83 /*
84  * Inspect the ring to see if we have completed events
85  */
86 static void fio_syslet_complete(struct thread_data *td)
87 {
88         struct syslet_data *sd = td->io_ops->data;
89
90         do {
91                 struct syslet_uatom *atom;
92
93                 atom = sd->ring[sd->ahu.user_ring_idx];
94                 if (!atom)
95                         break;
96
97                 sd->ring[sd->ahu.user_ring_idx] = NULL;
98                 if (++sd->ahu.user_ring_idx == td->iodepth)
99                         sd->ahu.user_ring_idx = 0;
100
101                 fio_syslet_complete_atom(td, atom);
102         } while (1);
103 }
104
105 static int fio_syslet_getevents(struct thread_data *td, int min,
106                                 int fio_unused max,
107                                 struct timespec fio_unused *t)
108 {
109         struct syslet_data *sd = td->io_ops->data;
110         long ret;
111
112         do {
113                 fio_syslet_complete(td);
114
115                 /*
116                  * do we have enough immediate completions?
117                  */
118                 if (sd->nr_events >= (unsigned int) min)
119                         break;
120
121                 /*
122                  * OK, we need to wait for some events...
123                  */
124                 ret = async_wait(1, sd->ahu.user_ring_idx, &sd->ahu);
125                 if (ret < 0)
126                         return -errno;
127         } while (1);
128
129         ret = sd->nr_events;
130         sd->nr_events = 0;
131         return ret;
132 }
133
134 static struct io_u *fio_syslet_event(struct thread_data *td, int event)
135 {
136         struct syslet_data *sd = td->io_ops->data;
137
138         return sd->events[event];
139 }
140
141 static void init_atom(struct syslet_uatom *atom, int nr, void *arg0,
142                       void *arg1, void *arg2, void *arg3, void *ret_ptr,
143                       unsigned long flags, void *priv)
144 {
145         atom->flags = flags;
146         atom->nr = nr;
147         atom->ret_ptr = ret_ptr;
148         atom->next = NULL;
149         atom->arg_ptr[0] = arg0;
150         atom->arg_ptr[1] = arg1;
151         atom->arg_ptr[2] = arg2;
152         atom->arg_ptr[3] = arg3;
153         atom->arg_ptr[4] = atom->arg_ptr[5] = NULL;
154         atom->private = priv;
155 }
156
157 /*
158  * Use seek atom for sync
159  */
160 static void fio_syslet_prep_sync(struct io_u *io_u, struct fio_file *f)
161 {
162         init_atom(&io_u->req.atom, __NR_fsync, &f->fd, NULL, NULL, NULL,
163                   &io_u->req.ret, 0, io_u);
164 }
165
166 static void fio_syslet_prep_rw(struct io_u *io_u, struct fio_file *f)
167 {
168         int nr;
169
170         /*
171          * prepare rw
172          */
173         if (io_u->ddir == DDIR_READ)
174                 nr = __NR_pread64;
175         else
176                 nr = __NR_pwrite64;
177
178         init_atom(&io_u->req.atom, nr, &f->fd, &io_u->xfer_buf,
179                   &io_u->xfer_buflen, &io_u->offset, &io_u->req.ret, 0, io_u);
180 }
181
182 static int fio_syslet_prep(struct thread_data fio_unused *td, struct io_u *io_u)
183 {
184         struct fio_file *f = io_u->file;
185
186         if (io_u->ddir == DDIR_SYNC)
187                 fio_syslet_prep_sync(io_u, f);
188         else
189                 fio_syslet_prep_rw(io_u, f);
190
191         return 0;
192 }
193
194 static void cachemiss_thread_start(void)
195 {
196         while (1)
197                 async_thread();
198 }
199
200 #define THREAD_STACK_SIZE (16384)
201
202 static unsigned long thread_stack_alloc()
203 {
204         return (unsigned long)malloc(THREAD_STACK_SIZE) + THREAD_STACK_SIZE;
205 }
206
207 static int fio_syslet_commit(struct thread_data *td)
208 {
209         struct syslet_data *sd = td->io_ops->data;
210         struct syslet_uatom *done;
211
212         if (!sd->head)
213                 return 0;
214
215         if (!sd->ahu.new_thread_stack)
216                 sd->ahu.new_thread_stack = thread_stack_alloc();
217
218         /*
219          * On sync completion, the atom is returned. So on NULL return
220          * it's queued asynchronously.
221          */
222         done = async_exec(sd->head, &sd->ahu);
223
224         sd->head = sd->tail = NULL;
225
226         if (done)
227                 fio_syslet_complete_atom(td, done);
228
229         return 0;
230 }
231
232 static int fio_syslet_queue(struct thread_data *td, struct io_u *io_u)
233 {
234         struct syslet_data *sd = td->io_ops->data;
235
236         if (sd->tail) {
237                 sd->tail->next = &io_u->req.atom;
238                 sd->tail = &io_u->req.atom;
239         } else
240                 sd->head = sd->tail = &io_u->req.atom;
241
242         sd->event_map[sd->event_map_idx++] = sd->tail;
243         return FIO_Q_QUEUED;
244 }
245
246 static int async_head_init(struct syslet_data *sd, unsigned int depth)
247 {
248         unsigned long ring_size;
249
250         memset(&sd->ahu, 0, sizeof(struct async_head_user));
251
252         ring_size = sizeof(struct syslet_uatom *) * depth;
253         sd->ring = malloc(ring_size);
254         memset(sd->ring, 0, ring_size);
255
256         sd->ahu.user_ring_idx = 0;
257         sd->ahu.completion_ring = sd->ring;
258         sd->ahu.ring_size_bytes = ring_size;
259         sd->ahu.head_stack = thread_stack_alloc();
260         sd->ahu.head_eip = (unsigned long)cachemiss_thread_start;
261         sd->ahu.new_thread_eip = (unsigned long)cachemiss_thread_start;
262
263         return 0;
264 }
265
266 static void async_head_exit(struct syslet_data *sd)
267 {
268         free(sd->ring);
269 }
270
271 static void fio_syslet_cleanup(struct thread_data *td)
272 {
273         struct syslet_data *sd = td->io_ops->data;
274
275         if (sd) {
276                 async_head_exit(sd);
277                 free(sd->events);
278                 free(sd->event_map);
279                 free(sd);
280                 td->io_ops->data = NULL;
281         }
282 }
283
284 static int fio_syslet_init(struct thread_data *td)
285 {
286         struct syslet_data *sd;
287
288
289         sd = malloc(sizeof(*sd));
290         memset(sd, 0, sizeof(*sd));
291         sd->events = malloc(sizeof(struct io_u *) * td->iodepth);
292         memset(sd->events, 0, sizeof(struct io_u *) * td->iodepth);
293         sd->event_map = malloc(sizeof(struct syslet_uatom *) * td->iodepth);
294         memset(sd->event_map, 0, sizeof(struct syslet_uatom *) * td->iodepth);
295
296         /*
297          * This will handily fail for kernels where syslet isn't available
298          */
299         if (async_head_init(sd, td->iodepth)) {
300                 free(sd->events);
301                 free(sd);
302                 return 1;
303         }
304
305         td->io_ops->data = sd;
306         return 0;
307 }
308
309 static struct ioengine_ops ioengine = {
310         .name           = "syslet-rw",
311         .version        = FIO_IOOPS_VERSION,
312         .init           = fio_syslet_init,
313         .prep           = fio_syslet_prep,
314         .queue          = fio_syslet_queue,
315         .commit         = fio_syslet_commit,
316         .getevents      = fio_syslet_getevents,
317         .event          = fio_syslet_event,
318         .cleanup        = fio_syslet_cleanup,
319 };
320
321 #else /* FIO_HAVE_SYSLET */
322
323 /*
324  * When we have a proper configure system in place, we simply wont build
325  * and install this io engine. For now install a crippled version that
326  * just complains and fails to load.
327  */
328 static int fio_syslet_init(struct thread_data fio_unused *td)
329 {
330         fprintf(stderr, "fio: syslet not available\n");
331         return 1;
332 }
333
334 static struct ioengine_ops ioengine = {
335         .name           = "syslet-rw",
336         .version        = FIO_IOOPS_VERSION,
337         .init           = fio_syslet_init,
338 };
339
340 #endif /* FIO_HAVE_SYSLET */
341
342 static void fio_init fio_syslet_register(void)
343 {
344         register_ioengine(&ioengine);
345 }
346
347 static void fio_exit fio_syslet_unregister(void)
348 {
349         unregister_ioengine(&ioengine);
350 }