4b1b263691f5825ef247b38a564a8c66f6d4f3cb
[fio.git] / engines / syslet-rw.c
1 /*
2  * read/write() engine that uses syslet to be async
3  *
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <unistd.h>
8 #include <errno.h>
9 #include <assert.h>
10
11 #include "../fio.h"
12 #include "../os.h"
13
14 #ifdef FIO_HAVE_SYSLET
15
16 struct syslet_data {
17         struct io_u **events;
18         unsigned int nr_events;
19         
20         struct async_head_user ahu;
21         struct syslet_uatom **ring;
22
23         struct syslet_uatom *head, *tail;
24 };
25
26 static void fio_syslet_complete_atom(struct thread_data *td,
27                                      struct syslet_uatom *atom)
28 {
29         struct syslet_data *sd = td->io_ops->data;
30         struct syslet_uatom *last;
31         struct io_u *io_u;
32
33         /*
34          * complete from the beginning of the sequence up to (and
35          * including) this atom
36          */
37         last = atom;
38         io_u = atom->private;
39         atom = io_u->req.head;
40
41         /*
42          * now complete in right order
43          */
44         do {
45                 long ret;
46
47                 io_u = atom->private;
48                 ret = *atom->ret_ptr;
49                 if (ret > 0)
50                         io_u->resid = io_u->xfer_buflen - ret;
51                 else if (ret < 0)
52                         io_u->error = ret;
53
54                 assert(sd->nr_events < td->iodepth);
55                 sd->events[sd->nr_events++] = io_u;
56
57                 if (atom == last)
58                         break;
59
60                 atom = atom->next;
61         } while (1);
62
63         assert(!last->next);
64 }
65
66 /*
67  * Inspect the ring to see if we have completed events
68  */
69 static void fio_syslet_complete(struct thread_data *td)
70 {
71         struct syslet_data *sd = td->io_ops->data;
72
73         do {
74                 struct syslet_uatom *atom;
75
76                 atom = sd->ring[sd->ahu.user_ring_idx];
77                 if (!atom)
78                         break;
79
80                 sd->ring[sd->ahu.user_ring_idx] = NULL;
81                 if (++sd->ahu.user_ring_idx == td->iodepth)
82                         sd->ahu.user_ring_idx = 0;
83
84                 fio_syslet_complete_atom(td, atom);
85         } while (1);
86 }
87
88 static int fio_syslet_getevents(struct thread_data *td, int min,
89                                 int fio_unused max,
90                                 struct timespec fio_unused *t)
91 {
92         struct syslet_data *sd = td->io_ops->data;
93         long ret;
94
95         do {
96                 fio_syslet_complete(td);
97
98                 /*
99                  * do we have enough immediate completions?
100                  */
101                 if (sd->nr_events >= (unsigned int) min)
102                         break;
103
104                 /*
105                  * OK, we need to wait for some events...
106                  */
107                 ret = async_wait(1, sd->ahu.user_ring_idx, &sd->ahu);
108                 if (ret < 0)
109                         return -errno;
110         } while (1);
111
112         ret = sd->nr_events;
113         sd->nr_events = 0;
114         return ret;
115 }
116
117 static struct io_u *fio_syslet_event(struct thread_data *td, int event)
118 {
119         struct syslet_data *sd = td->io_ops->data;
120
121         return sd->events[event];
122 }
123
124 static void init_atom(struct syslet_uatom *atom, int nr, void *arg0,
125                       void *arg1, void *arg2, void *arg3, void *ret_ptr,
126                       unsigned long flags, void *priv)
127 {
128         atom->flags = flags;
129         atom->nr = nr;
130         atom->ret_ptr = ret_ptr;
131         atom->next = NULL;
132         atom->arg_ptr[0] = arg0;
133         atom->arg_ptr[1] = arg1;
134         atom->arg_ptr[2] = arg2;
135         atom->arg_ptr[3] = arg3;
136         atom->arg_ptr[4] = atom->arg_ptr[5] = NULL;
137         atom->private = priv;
138 }
139
140 /*
141  * Use seek atom for sync
142  */
143 static void fio_syslet_prep_sync(struct io_u *io_u, struct fio_file *f)
144 {
145         init_atom(&io_u->req.atom, __NR_fsync, &f->fd, NULL, NULL, NULL,
146                   &io_u->req.ret, 0, io_u);
147 }
148
149 static void fio_syslet_prep_rw(struct io_u *io_u, struct fio_file *f)
150 {
151         int nr;
152
153         /*
154          * prepare rw
155          */
156         if (io_u->ddir == DDIR_READ)
157                 nr = __NR_pread64;
158         else
159                 nr = __NR_pwrite64;
160
161         init_atom(&io_u->req.atom, nr, &f->fd, &io_u->xfer_buf,
162                   &io_u->xfer_buflen, &io_u->offset, &io_u->req.ret, 0, io_u);
163 }
164
165 static int fio_syslet_prep(struct thread_data fio_unused *td, struct io_u *io_u)
166 {
167         struct fio_file *f = io_u->file;
168
169         if (io_u->ddir == DDIR_SYNC)
170                 fio_syslet_prep_sync(io_u, f);
171         else
172                 fio_syslet_prep_rw(io_u, f);
173
174         return 0;
175 }
176
177 static void cachemiss_thread_start(void)
178 {
179         while (1)
180                 async_thread(NULL, NULL);
181 }
182
183 #define THREAD_STACK_SIZE (16384)
184
185 static unsigned long thread_stack_alloc()
186 {
187         return (unsigned long) malloc(THREAD_STACK_SIZE) + THREAD_STACK_SIZE;
188 }
189
190 static int fio_syslet_commit(struct thread_data *td)
191 {
192         struct syslet_data *sd = td->io_ops->data;
193         struct syslet_uatom *done;
194
195         if (!sd->head)
196                 return 0;
197
198         assert(!sd->tail->next);
199
200         if (!sd->ahu.new_thread_stack)
201                 sd->ahu.new_thread_stack = thread_stack_alloc();
202
203         /*
204          * On sync completion, the atom is returned. So on NULL return
205          * it's queued asynchronously.
206          */
207         done = async_exec(sd->head, &sd->ahu);
208
209         sd->head = sd->tail = NULL;
210
211         if (done)
212                 fio_syslet_complete_atom(td, done);
213
214         return 0;
215 }
216
217 static int fio_syslet_queue(struct thread_data *td, struct io_u *io_u)
218 {
219         struct syslet_data *sd = td->io_ops->data;
220
221         if (sd->tail) {
222                 sd->tail->next = &io_u->req.atom;
223                 sd->tail = &io_u->req.atom;
224         } else
225                 sd->head = sd->tail = &io_u->req.atom;
226
227         io_u->req.head = sd->head;
228         return FIO_Q_QUEUED;
229 }
230
231 static int async_head_init(struct syslet_data *sd, unsigned int depth)
232 {
233         unsigned long ring_size;
234
235         memset(&sd->ahu, 0, sizeof(struct async_head_user));
236
237         ring_size = sizeof(struct syslet_uatom *) * depth;
238         sd->ring = malloc(ring_size);
239         memset(sd->ring, 0, ring_size);
240
241         sd->ahu.user_ring_idx = 0;
242         sd->ahu.completion_ring = sd->ring;
243         sd->ahu.ring_size_bytes = ring_size;
244         sd->ahu.head_stack = thread_stack_alloc();
245         sd->ahu.head_eip = (unsigned long) cachemiss_thread_start;
246         sd->ahu.new_thread_eip = (unsigned long) cachemiss_thread_start;
247
248         return 0;
249 }
250
251 static void async_head_exit(struct syslet_data *sd)
252 {
253         free(sd->ring);
254 }
255
256 static void fio_syslet_cleanup(struct thread_data *td)
257 {
258         struct syslet_data *sd = td->io_ops->data;
259
260         if (sd) {
261                 async_head_exit(sd);
262                 free(sd->events);
263                 free(sd);
264                 td->io_ops->data = NULL;
265         }
266 }
267
268 static int fio_syslet_init(struct thread_data *td)
269 {
270         struct syslet_data *sd;
271
272
273         sd = malloc(sizeof(*sd));
274         memset(sd, 0, sizeof(*sd));
275         sd->events = malloc(sizeof(struct io_u *) * td->iodepth);
276         memset(sd->events, 0, sizeof(struct io_u *) * td->iodepth);
277
278         /*
279          * This will handily fail for kernels where syslet isn't available
280          */
281         if (async_head_init(sd, td->iodepth)) {
282                 free(sd->events);
283                 free(sd);
284                 return 1;
285         }
286
287         td->io_ops->data = sd;
288         return 0;
289 }
290
291 static struct ioengine_ops ioengine = {
292         .name           = "syslet-rw",
293         .version        = FIO_IOOPS_VERSION,
294         .init           = fio_syslet_init,
295         .prep           = fio_syslet_prep,
296         .queue          = fio_syslet_queue,
297         .commit         = fio_syslet_commit,
298         .getevents      = fio_syslet_getevents,
299         .event          = fio_syslet_event,
300         .cleanup        = fio_syslet_cleanup,
301 };
302
303 #else /* FIO_HAVE_SYSLET */
304
305 /*
306  * When we have a proper configure system in place, we simply wont build
307  * and install this io engine. For now install a crippled version that
308  * just complains and fails to load.
309  */
310 static int fio_syslet_init(struct thread_data fio_unused *td)
311 {
312         fprintf(stderr, "fio: syslet not available\n");
313         return 1;
314 }
315
316 static struct ioengine_ops ioengine = {
317         .name           = "syslet-rw",
318         .version        = FIO_IOOPS_VERSION,
319         .init           = fio_syslet_init,
320 };
321
322 #endif /* FIO_HAVE_SYSLET */
323
324 static void fio_init fio_syslet_register(void)
325 {
326         register_ioengine(&ioengine);
327 }
328
329 static void fio_exit fio_syslet_unregister(void)
330 {
331         unregister_ioengine(&ioengine);
332 }