8eee2a2ec6f1fdd0ac9651ad8722131ef6c3dbd6
[fio.git] / engines / glusterfs_async.c
1 /*
2  * glusterfs engine
3  *
4  * IO engine using Glusterfs's gfapi async interface
5  *
6  */
7 #include "gfapi.h"
8
9 struct fio_gf_iou {
10         struct io_u *io_u;
11         int io_complete;
12 };
13
14 static struct io_u *fio_gf_event(struct thread_data *td, int event)
15 {
16         struct gf_data *gf_data = td->io_ops->data;
17         dprint(FD_IO, "%s\n", __FUNCTION__);
18         return gf_data->aio_events[event];
19 }
20
21 static int fio_gf_getevents(struct thread_data *td, unsigned int min,
22                              unsigned int max, struct timespec *t)
23 {
24         struct gf_data *g = td->io_ops->data;
25         unsigned int events = 0;
26         struct io_u *io_u;
27         int i = 0;
28         struct fio_gf_iou *io = NULL;
29
30         dprint(FD_IO, "%s\n", __FUNCTION__);
31         do {
32                 io_u_qiter(&td->io_u_all, io_u, i) {
33                         if (!(io_u->flags & IO_U_F_FLIGHT))
34                                 continue;
35
36                         io = (struct fio_gf_iou *)io_u->engine_data;
37
38                         if (io && io->io_complete) {
39                                 io->io_complete = 0;
40                                 g->aio_events[events] = io_u;
41                                 events++;
42                         }
43
44                 }
45                 if (events < min)
46                         usleep(100);
47                 else
48                         break;
49
50         } while (1);
51
52         return events;
53 }
54
55 #define LAST_POS(f)     ((f)->engine_data)
56 static int fio_gf_prep(struct thread_data *td, struct io_u *io_u)
57 {
58         struct fio_file *f = io_u->file;
59         struct gf_data *g = td->io_ops->data;
60         struct fio_gf_iou *io = NULL;
61
62         dprint(FD_FILE, "fio prep\n");
63
64         io = malloc(sizeof(struct fio_gf_iou));
65     if (!io){
66                 td_verror(td, errno, "malloc");
67                 return 1;
68     }
69         io->io_complete = 0;
70         io->io_u = io_u;
71         io_u->engine_data = io;
72
73         g->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *));
74         if (!g->aio_events){
75                 td_verror(td, errno, "malloc");
76         free(io);
77         return 1;
78     }
79
80         memset(g->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
81
82         if (!ddir_rw(io_u->ddir))
83                 return 0;
84
85         if (LAST_POS(f) != -1ULL && LAST_POS(f) == io_u->offset)
86                 return 0;
87
88         if (glfs_lseek(g->fd, io_u->offset, SEEK_SET) < 0) {
89                 td_verror(td, errno, "lseek");
90                 return 1;
91         }
92         io = malloc(sizeof(struct fio_gf_iou));
93     if (!io){
94                 td_verror(td, errno, "malloc");
95         return 1;
96     }
97
98         return 0;
99 }
100
101 static void gf_async_cb(glfs_fd_t *fd, ssize_t ret, void *data) 
102 {
103         struct io_u *io_u = (struct io_u *)data;
104         struct fio_gf_iou *iou =
105             (struct fio_gf_iou *)io_u->engine_data;
106
107     dprint(FD_IO, "%s ret %lu\n", __FUNCTION__, ret);    
108     iou->io_complete = 1;
109 }
110
111 static int fio_gf_async_queue(struct thread_data fio_unused *td, struct io_u *io_u)
112 {
113         struct gf_data *g = td->io_ops->data;
114     int r = 0;
115
116         fio_ro_check(td, io_u);
117
118         if (io_u->ddir == DDIR_READ)
119                 r = glfs_pread_async(g->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset,
120                          0, gf_async_cb, (void *)io_u);
121         else if (io_u->ddir == DDIR_WRITE)
122                 r = glfs_pread_async(g->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset,
123                          0, gf_async_cb, (void *)io_u);
124     else if (io_u->ddir == DDIR_SYNC) {
125         r = glfs_fsync_async(g->fd, gf_async_cb, (void *)io_u);
126     }else {         
127         log_err("unsupported operation.\n");
128         io_u->error = -EINVAL;
129         goto failed;
130     }
131     if (r){
132         log_err("glfs failed.\n");
133         io_u->error = r;
134         goto failed;
135     }
136
137         return FIO_Q_QUEUED;
138
139 failed:
140         io_u->error = r;
141         td_verror(td, io_u->error, "xfer");
142         return FIO_Q_COMPLETED;
143 }
144
145
146 static struct ioengine_ops ioengine = {
147         .name               = "gfapi_async",
148         .version            = FIO_IOOPS_VERSION,
149         .init           = fio_gf_setup,
150         .cleanup        = fio_gf_cleanup,
151     .prep           = fio_gf_prep,
152         .queue              = fio_gf_async_queue,
153         .open_file          = fio_gf_open_file,
154         .close_file         = fio_gf_close_file,
155         .get_file_size  = fio_gf_get_file_size,
156         .getevents      = fio_gf_getevents,
157         .event          = fio_gf_event,
158         .options        = gfapi_options,
159         .option_struct_size = sizeof(struct gf_options),
160         .flags              = FIO_DISKLESSIO,
161 };
162
163 static void fio_init fio_gf_register(void)
164 {
165     register_ioengine(&ioengine);
166 }
167
168 static void fio_exit fio_gf_unregister(void)
169 {
170     unregister_ioengine(&ioengine);
171 }