fio: add libpmem engine
[fio.git] / engines / rdma.c
1 /*
2  * RDMA I/O engine
3  *
4  * RDMA I/O engine based on the IB verbs and RDMA/CM user space libraries.
5  * Supports both RDMA memory semantics and channel semantics
6  *   for the InfiniBand, RoCE and iWARP protocols.
7  *
8  * You will need the Linux RDMA software installed, either
9  * from your Linux distributor or directly from openfabrics.org:
10  *
11  * http://www.openfabrics.org/downloads/OFED/
12  *
13  * Exchanging steps of RDMA ioengine control messages:
14  *      1. client side sends test mode (RDMA_WRITE/RDMA_READ/SEND)
15  *         to server side.
16  *      2. server side parses test mode, and sends back confirmation
17  *         to client side. In RDMA WRITE/READ test, this confirmation
18  *         includes memory information, such as rkey, address.
19  *      3. client side initiates test loop.
20  *      4. In RDMA WRITE/READ test, client side sends a completion
21  *         notification to server side. Server side updates its
22  *         td->done as true.
23  *
24  */
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
32 #include <netdb.h>
33 #include <sys/poll.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <sys/time.h>
37 #include <sys/resource.h>
38
39 #include <pthread.h>
40 #include <inttypes.h>
41
42 #include "../fio.h"
43 #include "../hash.h"
44 #include "../optgroup.h"
45
46 #include <rdma/rdma_cma.h>
47
48 #define FIO_RDMA_MAX_IO_DEPTH    512
49
50 enum rdma_io_mode {
51         FIO_RDMA_UNKNOWN = 0,
52         FIO_RDMA_MEM_WRITE,
53         FIO_RDMA_MEM_READ,
54         FIO_RDMA_CHA_SEND,
55         FIO_RDMA_CHA_RECV
56 };
57
58 struct rdmaio_options {
59         struct thread_data *td;
60         unsigned int port;
61         enum rdma_io_mode verb;
62 };
63
64 static int str_hostname_cb(void *data, const char *input)
65 {
66         struct rdmaio_options *o = data;
67
68         if (o->td->o.filename)
69                 free(o->td->o.filename);
70         o->td->o.filename = strdup(input);
71         return 0;
72 }
73
74 static struct fio_option options[] = {
75         {
76                 .name   = "hostname",
77                 .lname  = "rdma engine hostname",
78                 .type   = FIO_OPT_STR_STORE,
79                 .cb     = str_hostname_cb,
80                 .help   = "Hostname for RDMA IO engine",
81                 .category = FIO_OPT_C_ENGINE,
82                 .group  = FIO_OPT_G_RDMA,
83         },
84         {
85                 .name   = "port",
86                 .lname  = "rdma engine port",
87                 .type   = FIO_OPT_INT,
88                 .off1   = offsetof(struct rdmaio_options, port),
89                 .minval = 1,
90                 .maxval = 65535,
91                 .help   = "Port to use for RDMA connections",
92                 .category = FIO_OPT_C_ENGINE,
93                 .group  = FIO_OPT_G_RDMA,
94         },
95         {
96                 .name   = "verb",
97                 .lname  = "RDMA engine verb",
98                 .alias  = "proto",
99                 .type   = FIO_OPT_STR,
100                 .off1   = offsetof(struct rdmaio_options, verb),
101                 .help   = "RDMA engine verb",
102                 .def    = "write",
103                 .posval = {
104                           { .ival = "write",
105                             .oval = FIO_RDMA_MEM_WRITE,
106                             .help = "Memory Write",
107                           },
108                           { .ival = "read",
109                             .oval = FIO_RDMA_MEM_READ,
110                             .help = "Memory Read",
111                           },
112                           { .ival = "send",
113                             .oval = FIO_RDMA_CHA_SEND,
114                             .help = "Posted Send",
115                           },
116                           { .ival = "recv",
117                             .oval = FIO_RDMA_CHA_RECV,
118                             .help = "Posted Receive",
119                           },
120                 },
121                 .category = FIO_OPT_C_ENGINE,
122                 .group  = FIO_OPT_G_RDMA,
123         },
124         {
125                 .name   = NULL,
126         },
127 };
128
129 struct remote_u {
130         uint64_t buf;
131         uint32_t rkey;
132         uint32_t size;
133 };
134
135 struct rdma_info_blk {
136         uint32_t mode;          /* channel semantic or memory semantic */
137         uint32_t nr;            /* client: io depth
138                                    server: number of records for memory semantic
139                                  */
140         uint32_t max_bs;        /* maximum block size */
141         struct remote_u rmt_us[FIO_RDMA_MAX_IO_DEPTH];
142 };
143
144 struct rdma_io_u_data {
145         uint64_t wr_id;
146         struct ibv_send_wr sq_wr;
147         struct ibv_recv_wr rq_wr;
148         struct ibv_sge rdma_sgl;
149 };
150
151 struct rdmaio_data {
152         int is_client;
153         enum rdma_io_mode rdma_protocol;
154         char host[64];
155         struct sockaddr_in addr;
156
157         struct ibv_recv_wr rq_wr;
158         struct ibv_sge recv_sgl;
159         struct rdma_info_blk recv_buf;
160         struct ibv_mr *recv_mr;
161
162         struct ibv_send_wr sq_wr;
163         struct ibv_sge send_sgl;
164         struct rdma_info_blk send_buf;
165         struct ibv_mr *send_mr;
166
167         struct ibv_comp_channel *channel;
168         struct ibv_cq *cq;
169         struct ibv_pd *pd;
170         struct ibv_qp *qp;
171
172         pthread_t cmthread;
173         struct rdma_event_channel *cm_channel;
174         struct rdma_cm_id *cm_id;
175         struct rdma_cm_id *child_cm_id;
176
177         int cq_event_num;
178
179         struct remote_u *rmt_us;
180         int rmt_nr;
181         struct io_u **io_us_queued;
182         int io_u_queued_nr;
183         struct io_u **io_us_flight;
184         int io_u_flight_nr;
185         struct io_u **io_us_completed;
186         int io_u_completed_nr;
187
188         struct frand_state rand_state;
189 };
190
191 static int client_recv(struct thread_data *td, struct ibv_wc *wc)
192 {
193         struct rdmaio_data *rd = td->io_ops_data;
194         unsigned int max_bs;
195
196         if (wc->byte_len != sizeof(rd->recv_buf)) {
197                 log_err("Received bogus data, size %d\n", wc->byte_len);
198                 return 1;
199         }
200
201         max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
202         if (max_bs > ntohl(rd->recv_buf.max_bs)) {
203                 log_err("fio: Server's block size (%d) must be greater than or "
204                         "equal to the client's block size (%d)!\n",
205                         ntohl(rd->recv_buf.max_bs), max_bs);
206                 return 1;
207         }
208
209         /* store mr info for MEMORY semantic */
210         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
211             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
212                 /* struct flist_head *entry; */
213                 int i = 0;
214
215                 rd->rmt_nr = ntohl(rd->recv_buf.nr);
216
217                 for (i = 0; i < rd->rmt_nr; i++) {
218                         rd->rmt_us[i].buf = be64_to_cpu(rd->recv_buf.rmt_us[i].buf);
219                         rd->rmt_us[i].rkey = ntohl(rd->recv_buf.rmt_us[i].rkey);
220                         rd->rmt_us[i].size = ntohl(rd->recv_buf.rmt_us[i].size);
221
222                         dprint(FD_IO,
223                                "fio: Received rkey %x addr %" PRIx64
224                                " len %d from peer\n", rd->rmt_us[i].rkey,
225                                rd->rmt_us[i].buf, rd->rmt_us[i].size);
226                 }
227         }
228
229         return 0;
230 }
231
232 static int server_recv(struct thread_data *td, struct ibv_wc *wc)
233 {
234         struct rdmaio_data *rd = td->io_ops_data;
235         unsigned int max_bs;
236
237         if (wc->wr_id == FIO_RDMA_MAX_IO_DEPTH) {
238                 rd->rdma_protocol = ntohl(rd->recv_buf.mode);
239
240                 /* CHANNEL semantic, do nothing */
241                 if (rd->rdma_protocol == FIO_RDMA_CHA_SEND)
242                         rd->rdma_protocol = FIO_RDMA_CHA_RECV;
243
244                 max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
245                 if (max_bs < ntohl(rd->recv_buf.max_bs)) {
246                         log_err("fio: Server's block size (%d) must be greater than or "
247                                 "equal to the client's block size (%d)!\n",
248                                 ntohl(rd->recv_buf.max_bs), max_bs);
249                         return 1;
250                 }
251
252         }
253
254         return 0;
255 }
256
257 static int cq_event_handler(struct thread_data *td, enum ibv_wc_opcode opcode)
258 {
259         struct rdmaio_data *rd = td->io_ops_data;
260         struct ibv_wc wc;
261         struct rdma_io_u_data *r_io_u_d;
262         int ret;
263         int compevnum = 0;
264         int i;
265
266         while ((ret = ibv_poll_cq(rd->cq, 1, &wc)) == 1) {
267                 ret = 0;
268                 compevnum++;
269
270                 if (wc.status) {
271                         log_err("fio: cq completion status %d(%s)\n",
272                                 wc.status, ibv_wc_status_str(wc.status));
273                         return -1;
274                 }
275
276                 switch (wc.opcode) {
277
278                 case IBV_WC_RECV:
279                         if (rd->is_client == 1)
280                                 ret = client_recv(td, &wc);
281                         else
282                                 ret = server_recv(td, &wc);
283
284                         if (ret)
285                                 return -1;
286
287                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
288                                 break;
289
290                         for (i = 0; i < rd->io_u_flight_nr; i++) {
291                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
292
293                                 if (wc.wr_id == r_io_u_d->rq_wr.wr_id) {
294                                         rd->io_us_flight[i]->resid =
295                                             rd->io_us_flight[i]->buflen
296                                             - wc.byte_len;
297
298                                         rd->io_us_flight[i]->error = 0;
299
300                                         rd->io_us_completed[rd->
301                                                             io_u_completed_nr]
302                                             = rd->io_us_flight[i];
303                                         rd->io_u_completed_nr++;
304                                         break;
305                                 }
306                         }
307                         if (i == rd->io_u_flight_nr)
308                                 log_err("fio: recv wr %" PRId64 " not found\n",
309                                         wc.wr_id);
310                         else {
311                                 /* put the last one into middle of the list */
312                                 rd->io_us_flight[i] =
313                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
314                                 rd->io_u_flight_nr--;
315                         }
316
317                         break;
318
319                 case IBV_WC_SEND:
320                 case IBV_WC_RDMA_WRITE:
321                 case IBV_WC_RDMA_READ:
322                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
323                                 break;
324
325                         for (i = 0; i < rd->io_u_flight_nr; i++) {
326                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
327
328                                 if (wc.wr_id == r_io_u_d->sq_wr.wr_id) {
329                                         rd->io_us_completed[rd->
330                                                             io_u_completed_nr]
331                                             = rd->io_us_flight[i];
332                                         rd->io_u_completed_nr++;
333                                         break;
334                                 }
335                         }
336                         if (i == rd->io_u_flight_nr)
337                                 log_err("fio: send wr %" PRId64 " not found\n",
338                                         wc.wr_id);
339                         else {
340                                 /* put the last one into middle of the list */
341                                 rd->io_us_flight[i] =
342                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
343                                 rd->io_u_flight_nr--;
344                         }
345
346                         break;
347
348                 default:
349                         log_info("fio: unknown completion event %d\n",
350                                  wc.opcode);
351                         return -1;
352                 }
353                 rd->cq_event_num++;
354         }
355
356         if (ret) {
357                 log_err("fio: poll error %d\n", ret);
358                 return 1;
359         }
360
361         return compevnum;
362 }
363
364 /*
365  * Return -1 for error and 'nr events' for a positive number
366  * of events
367  */
368 static int rdma_poll_wait(struct thread_data *td, enum ibv_wc_opcode opcode)
369 {
370         struct rdmaio_data *rd = td->io_ops_data;
371         struct ibv_cq *ev_cq;
372         void *ev_ctx;
373         int ret;
374
375         if (rd->cq_event_num > 0) {     /* previous left */
376                 rd->cq_event_num--;
377                 return 0;
378         }
379
380 again:
381         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
382                 log_err("fio: Failed to get cq event!\n");
383                 return -1;
384         }
385         if (ev_cq != rd->cq) {
386                 log_err("fio: Unknown CQ!\n");
387                 return -1;
388         }
389         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
390                 log_err("fio: Failed to set notify!\n");
391                 return -1;
392         }
393
394         ret = cq_event_handler(td, opcode);
395         if (ret == 0)
396                 goto again;
397
398         ibv_ack_cq_events(rd->cq, ret);
399
400         rd->cq_event_num--;
401
402         return ret;
403 }
404
405 static int fio_rdmaio_setup_qp(struct thread_data *td)
406 {
407         struct rdmaio_data *rd = td->io_ops_data;
408         struct ibv_qp_init_attr init_attr;
409         int qp_depth = td->o.iodepth * 2;       /* 2 times of io depth */
410
411         if (rd->is_client == 0)
412                 rd->pd = ibv_alloc_pd(rd->child_cm_id->verbs);
413         else
414                 rd->pd = ibv_alloc_pd(rd->cm_id->verbs);
415
416         if (rd->pd == NULL) {
417                 log_err("fio: ibv_alloc_pd fail: %m\n");
418                 return 1;
419         }
420
421         if (rd->is_client == 0)
422                 rd->channel = ibv_create_comp_channel(rd->child_cm_id->verbs);
423         else
424                 rd->channel = ibv_create_comp_channel(rd->cm_id->verbs);
425         if (rd->channel == NULL) {
426                 log_err("fio: ibv_create_comp_channel fail: %m\n");
427                 goto err1;
428         }
429
430         if (qp_depth < 16)
431                 qp_depth = 16;
432
433         if (rd->is_client == 0)
434                 rd->cq = ibv_create_cq(rd->child_cm_id->verbs,
435                                        qp_depth, rd, rd->channel, 0);
436         else
437                 rd->cq = ibv_create_cq(rd->cm_id->verbs,
438                                        qp_depth, rd, rd->channel, 0);
439         if (rd->cq == NULL) {
440                 log_err("fio: ibv_create_cq failed: %m\n");
441                 goto err2;
442         }
443
444         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
445                 log_err("fio: ibv_req_notify_cq failed: %m\n");
446                 goto err3;
447         }
448
449         /* create queue pair */
450         memset(&init_attr, 0, sizeof(init_attr));
451         init_attr.cap.max_send_wr = qp_depth;
452         init_attr.cap.max_recv_wr = qp_depth;
453         init_attr.cap.max_recv_sge = 1;
454         init_attr.cap.max_send_sge = 1;
455         init_attr.qp_type = IBV_QPT_RC;
456         init_attr.send_cq = rd->cq;
457         init_attr.recv_cq = rd->cq;
458
459         if (rd->is_client == 0) {
460                 if (rdma_create_qp(rd->child_cm_id, rd->pd, &init_attr) != 0) {
461                         log_err("fio: rdma_create_qp failed: %m\n");
462                         goto err3;
463                 }
464                 rd->qp = rd->child_cm_id->qp;
465         } else {
466                 if (rdma_create_qp(rd->cm_id, rd->pd, &init_attr) != 0) {
467                         log_err("fio: rdma_create_qp failed: %m\n");
468                         goto err3;
469                 }
470                 rd->qp = rd->cm_id->qp;
471         }
472
473         return 0;
474
475 err3:
476         ibv_destroy_cq(rd->cq);
477 err2:
478         ibv_destroy_comp_channel(rd->channel);
479 err1:
480         ibv_dealloc_pd(rd->pd);
481
482         return 1;
483 }
484
485 static int fio_rdmaio_setup_control_msg_buffers(struct thread_data *td)
486 {
487         struct rdmaio_data *rd = td->io_ops_data;
488
489         rd->recv_mr = ibv_reg_mr(rd->pd, &rd->recv_buf, sizeof(rd->recv_buf),
490                                  IBV_ACCESS_LOCAL_WRITE);
491         if (rd->recv_mr == NULL) {
492                 log_err("fio: recv_buf reg_mr failed: %m\n");
493                 return 1;
494         }
495
496         rd->send_mr = ibv_reg_mr(rd->pd, &rd->send_buf, sizeof(rd->send_buf),
497                                  0);
498         if (rd->send_mr == NULL) {
499                 log_err("fio: send_buf reg_mr failed: %m\n");
500                 ibv_dereg_mr(rd->recv_mr);
501                 return 1;
502         }
503
504         /* setup work request */
505         /* recv wq */
506         rd->recv_sgl.addr = (uint64_t) (unsigned long)&rd->recv_buf;
507         rd->recv_sgl.length = sizeof(rd->recv_buf);
508         rd->recv_sgl.lkey = rd->recv_mr->lkey;
509         rd->rq_wr.sg_list = &rd->recv_sgl;
510         rd->rq_wr.num_sge = 1;
511         rd->rq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
512
513         /* send wq */
514         rd->send_sgl.addr = (uint64_t) (unsigned long)&rd->send_buf;
515         rd->send_sgl.length = sizeof(rd->send_buf);
516         rd->send_sgl.lkey = rd->send_mr->lkey;
517
518         rd->sq_wr.opcode = IBV_WR_SEND;
519         rd->sq_wr.send_flags = IBV_SEND_SIGNALED;
520         rd->sq_wr.sg_list = &rd->send_sgl;
521         rd->sq_wr.num_sge = 1;
522         rd->sq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
523
524         return 0;
525 }
526
527 static int get_next_channel_event(struct thread_data *td,
528                                   struct rdma_event_channel *channel,
529                                   enum rdma_cm_event_type wait_event)
530 {
531         struct rdmaio_data *rd = td->io_ops_data;
532         struct rdma_cm_event *event;
533         int ret;
534
535         ret = rdma_get_cm_event(channel, &event);
536         if (ret) {
537                 log_err("fio: rdma_get_cm_event: %d\n", ret);
538                 return 1;
539         }
540
541         if (event->event != wait_event) {
542                 log_err("fio: event is %s instead of %s\n",
543                         rdma_event_str(event->event),
544                         rdma_event_str(wait_event));
545                 return 1;
546         }
547
548         switch (event->event) {
549         case RDMA_CM_EVENT_CONNECT_REQUEST:
550                 rd->child_cm_id = event->id;
551                 break;
552         default:
553                 break;
554         }
555
556         rdma_ack_cm_event(event);
557
558         return 0;
559 }
560
561 static int fio_rdmaio_prep(struct thread_data *td, struct io_u *io_u)
562 {
563         struct rdmaio_data *rd = td->io_ops_data;
564         struct rdma_io_u_data *r_io_u_d;
565
566         r_io_u_d = io_u->engine_data;
567
568         switch (rd->rdma_protocol) {
569         case FIO_RDMA_MEM_WRITE:
570         case FIO_RDMA_MEM_READ:
571                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
572                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
573                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
574                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
575                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
576                 r_io_u_d->sq_wr.num_sge = 1;
577                 break;
578         case FIO_RDMA_CHA_SEND:
579                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
580                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
581                 r_io_u_d->rdma_sgl.length = io_u->buflen;
582                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
583                 r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
584                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
585                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
586                 r_io_u_d->sq_wr.num_sge = 1;
587                 break;
588         case FIO_RDMA_CHA_RECV:
589                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
590                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
591                 r_io_u_d->rdma_sgl.length = io_u->buflen;
592                 r_io_u_d->rq_wr.wr_id = r_io_u_d->wr_id;
593                 r_io_u_d->rq_wr.sg_list = &r_io_u_d->rdma_sgl;
594                 r_io_u_d->rq_wr.num_sge = 1;
595                 break;
596         default:
597                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
598                 break;
599         }
600
601         return 0;
602 }
603
604 static struct io_u *fio_rdmaio_event(struct thread_data *td, int event)
605 {
606         struct rdmaio_data *rd = td->io_ops_data;
607         struct io_u *io_u;
608         int i;
609
610         io_u = rd->io_us_completed[0];
611         for (i = 0; i < rd->io_u_completed_nr - 1; i++)
612                 rd->io_us_completed[i] = rd->io_us_completed[i + 1];
613
614         rd->io_u_completed_nr--;
615
616         dprint_io_u(io_u, "fio_rdmaio_event");
617
618         return io_u;
619 }
620
621 static int fio_rdmaio_getevents(struct thread_data *td, unsigned int min,
622                                 unsigned int max, const struct timespec *t)
623 {
624         struct rdmaio_data *rd = td->io_ops_data;
625         enum ibv_wc_opcode comp_opcode;
626         struct ibv_cq *ev_cq;
627         void *ev_ctx;
628         int ret, r = 0;
629         comp_opcode = IBV_WC_RDMA_WRITE;
630
631         switch (rd->rdma_protocol) {
632         case FIO_RDMA_MEM_WRITE:
633                 comp_opcode = IBV_WC_RDMA_WRITE;
634                 break;
635         case FIO_RDMA_MEM_READ:
636                 comp_opcode = IBV_WC_RDMA_READ;
637                 break;
638         case FIO_RDMA_CHA_SEND:
639                 comp_opcode = IBV_WC_SEND;
640                 break;
641         case FIO_RDMA_CHA_RECV:
642                 comp_opcode = IBV_WC_RECV;
643                 break;
644         default:
645                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
646                 break;
647         }
648
649         if (rd->cq_event_num > 0) {     /* previous left */
650                 rd->cq_event_num--;
651                 return 0;
652         }
653
654 again:
655         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
656                 log_err("fio: Failed to get cq event!\n");
657                 return -1;
658         }
659         if (ev_cq != rd->cq) {
660                 log_err("fio: Unknown CQ!\n");
661                 return -1;
662         }
663         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
664                 log_err("fio: Failed to set notify!\n");
665                 return -1;
666         }
667
668         ret = cq_event_handler(td, comp_opcode);
669         if (ret < 1)
670                 goto again;
671
672         ibv_ack_cq_events(rd->cq, ret);
673
674         r += ret;
675         if (r < min)
676                 goto again;
677
678         rd->cq_event_num -= r;
679
680         return r;
681 }
682
683 static int fio_rdmaio_send(struct thread_data *td, struct io_u **io_us,
684                            unsigned int nr)
685 {
686         struct rdmaio_data *rd = td->io_ops_data;
687         struct ibv_send_wr *bad_wr;
688 #if 0
689         enum ibv_wc_opcode comp_opcode;
690         comp_opcode = IBV_WC_RDMA_WRITE;
691 #endif
692         int i;
693         long index;
694         struct rdma_io_u_data *r_io_u_d;
695
696         r_io_u_d = NULL;
697
698         for (i = 0; i < nr; i++) {
699                 /* RDMA_WRITE or RDMA_READ */
700                 switch (rd->rdma_protocol) {
701                 case FIO_RDMA_MEM_WRITE:
702                         /* compose work request */
703                         r_io_u_d = io_us[i]->engine_data;
704                         index = __rand(&rd->rand_state) % rd->rmt_nr;
705                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_WRITE;
706                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
707                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
708                                 rd->rmt_us[index].buf;
709                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
710                         break;
711                 case FIO_RDMA_MEM_READ:
712                         /* compose work request */
713                         r_io_u_d = io_us[i]->engine_data;
714                         index = __rand(&rd->rand_state) % rd->rmt_nr;
715                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_READ;
716                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
717                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
718                                 rd->rmt_us[index].buf;
719                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
720                         break;
721                 case FIO_RDMA_CHA_SEND:
722                         r_io_u_d = io_us[i]->engine_data;
723                         r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
724                         r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
725                         break;
726                 default:
727                         log_err("fio: unknown rdma protocol - %d\n",
728                                 rd->rdma_protocol);
729                         break;
730                 }
731
732                 if (ibv_post_send(rd->qp, &r_io_u_d->sq_wr, &bad_wr) != 0) {
733                         log_err("fio: ibv_post_send fail: %m\n");
734                         return -1;
735                 }
736
737                 dprint_io_u(io_us[i], "fio_rdmaio_send");
738         }
739
740         /* wait for completion
741            rdma_poll_wait(td, comp_opcode); */
742
743         return i;
744 }
745
746 static int fio_rdmaio_recv(struct thread_data *td, struct io_u **io_us,
747                            unsigned int nr)
748 {
749         struct rdmaio_data *rd = td->io_ops_data;
750         struct ibv_recv_wr *bad_wr;
751         struct rdma_io_u_data *r_io_u_d;
752         int i;
753
754         i = 0;
755         if (rd->rdma_protocol == FIO_RDMA_CHA_RECV) {
756                 /* post io_u into recv queue */
757                 for (i = 0; i < nr; i++) {
758                         r_io_u_d = io_us[i]->engine_data;
759                         if (ibv_post_recv(rd->qp, &r_io_u_d->rq_wr, &bad_wr) !=
760                             0) {
761                                 log_err("fio: ibv_post_recv fail: %m\n");
762                                 return 1;
763                         }
764                 }
765         } else if ((rd->rdma_protocol == FIO_RDMA_MEM_READ)
766                    || (rd->rdma_protocol == FIO_RDMA_MEM_WRITE)) {
767                 /* re-post the rq_wr */
768                 if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
769                         log_err("fio: ibv_post_recv fail: %m\n");
770                         return 1;
771                 }
772
773                 rdma_poll_wait(td, IBV_WC_RECV);
774
775                 dprint(FD_IO, "fio: recv FINISH message\n");
776                 td->done = 1;
777                 return 0;
778         }
779
780         return i;
781 }
782
783 static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u)
784 {
785         struct rdmaio_data *rd = td->io_ops_data;
786
787         fio_ro_check(td, io_u);
788
789         if (rd->io_u_queued_nr == (int)td->o.iodepth)
790                 return FIO_Q_BUSY;
791
792         rd->io_us_queued[rd->io_u_queued_nr] = io_u;
793         rd->io_u_queued_nr++;
794
795         dprint_io_u(io_u, "fio_rdmaio_queue");
796
797         return FIO_Q_QUEUED;
798 }
799
800 static void fio_rdmaio_queued(struct thread_data *td, struct io_u **io_us,
801                               unsigned int nr)
802 {
803         struct rdmaio_data *rd = td->io_ops_data;
804         struct timespec now;
805         unsigned int i;
806
807         if (!fio_fill_issue_time(td))
808                 return;
809
810         fio_gettime(&now, NULL);
811
812         for (i = 0; i < nr; i++) {
813                 struct io_u *io_u = io_us[i];
814
815                 /* queued -> flight */
816                 rd->io_us_flight[rd->io_u_flight_nr] = io_u;
817                 rd->io_u_flight_nr++;
818
819                 memcpy(&io_u->issue_time, &now, sizeof(now));
820                 io_u_queued(td, io_u);
821         }
822 }
823
824 static int fio_rdmaio_commit(struct thread_data *td)
825 {
826         struct rdmaio_data *rd = td->io_ops_data;
827         struct io_u **io_us;
828         int ret;
829
830         if (!rd->io_us_queued)
831                 return 0;
832
833         io_us = rd->io_us_queued;
834         do {
835                 /* RDMA_WRITE or RDMA_READ */
836                 if (rd->is_client)
837                         ret = fio_rdmaio_send(td, io_us, rd->io_u_queued_nr);
838                 else if (!rd->is_client)
839                         ret = fio_rdmaio_recv(td, io_us, rd->io_u_queued_nr);
840                 else
841                         ret = 0;        /* must be a SYNC */
842
843                 if (ret > 0) {
844                         fio_rdmaio_queued(td, io_us, ret);
845                         io_u_mark_submit(td, ret);
846                         rd->io_u_queued_nr -= ret;
847                         io_us += ret;
848                         ret = 0;
849                 } else
850                         break;
851         } while (rd->io_u_queued_nr);
852
853         return ret;
854 }
855
856 static int fio_rdmaio_connect(struct thread_data *td, struct fio_file *f)
857 {
858         struct rdmaio_data *rd = td->io_ops_data;
859         struct rdma_conn_param conn_param;
860         struct ibv_send_wr *bad_wr;
861
862         memset(&conn_param, 0, sizeof(conn_param));
863         conn_param.responder_resources = 1;
864         conn_param.initiator_depth = 1;
865         conn_param.retry_count = 10;
866
867         if (rdma_connect(rd->cm_id, &conn_param) != 0) {
868                 log_err("fio: rdma_connect fail: %m\n");
869                 return 1;
870         }
871
872         if (get_next_channel_event
873             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
874                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
875                 return 1;
876         }
877
878         /* send task request */
879         rd->send_buf.mode = htonl(rd->rdma_protocol);
880         rd->send_buf.nr = htonl(td->o.iodepth);
881
882         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
883                 log_err("fio: ibv_post_send fail: %m\n");
884                 return 1;
885         }
886
887         if (rdma_poll_wait(td, IBV_WC_SEND) < 0)
888                 return 1;
889
890         /* wait for remote MR info from server side */
891         if (rdma_poll_wait(td, IBV_WC_RECV) < 0)
892                 return 1;
893
894         /* In SEND/RECV test, it's a good practice to setup the iodepth of
895          * of the RECV side deeper than that of the SEND side to
896          * avoid RNR (receiver not ready) error. The
897          * SEND side may send so many unsolicited message before
898          * RECV side commits sufficient recv buffers into recv queue.
899          * This may lead to RNR error. Here, SEND side pauses for a while
900          * during which RECV side commits sufficient recv buffers.
901          */
902         usleep(500000);
903
904         return 0;
905 }
906
907 static int fio_rdmaio_accept(struct thread_data *td, struct fio_file *f)
908 {
909         struct rdmaio_data *rd = td->io_ops_data;
910         struct rdma_conn_param conn_param;
911         struct ibv_send_wr *bad_wr;
912         int ret = 0;
913
914         /* rdma_accept() - then wait for accept success */
915         memset(&conn_param, 0, sizeof(conn_param));
916         conn_param.responder_resources = 1;
917         conn_param.initiator_depth = 1;
918
919         if (rdma_accept(rd->child_cm_id, &conn_param) != 0) {
920                 log_err("fio: rdma_accept: %m\n");
921                 return 1;
922         }
923
924         if (get_next_channel_event
925             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
926                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
927                 return 1;
928         }
929
930         /* wait for request */
931         ret = rdma_poll_wait(td, IBV_WC_RECV) < 0;
932
933         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
934                 log_err("fio: ibv_post_send fail: %m\n");
935                 return 1;
936         }
937
938         if (rdma_poll_wait(td, IBV_WC_SEND) < 0)
939                 return 1;
940
941         return ret;
942 }
943
944 static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f)
945 {
946         if (td_read(td))
947                 return fio_rdmaio_accept(td, f);
948         else
949                 return fio_rdmaio_connect(td, f);
950 }
951
952 static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f)
953 {
954         struct rdmaio_data *rd = td->io_ops_data;
955         struct ibv_send_wr *bad_wr;
956
957         /* unregister rdma buffer */
958
959         /*
960          * Client sends notification to the server side
961          */
962         /* refer to: http://linux.die.net/man/7/rdma_cm */
963         if ((rd->is_client == 1) && ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE)
964                                      || (rd->rdma_protocol ==
965                                          FIO_RDMA_MEM_READ))) {
966                 if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
967                         log_err("fio: ibv_post_send fail: %m\n");
968                         return 1;
969                 }
970
971                 dprint(FD_IO, "fio: close information sent success\n");
972                 rdma_poll_wait(td, IBV_WC_SEND);
973         }
974
975         if (rd->is_client == 1)
976                 rdma_disconnect(rd->cm_id);
977         else {
978                 rdma_disconnect(rd->child_cm_id);
979 #if 0
980                 rdma_disconnect(rd->cm_id);
981 #endif
982         }
983
984 #if 0
985         if (get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_DISCONNECTED) != 0) {
986                 log_err("fio: wait for RDMA_CM_EVENT_DISCONNECTED\n");
987                 return 1;
988         }
989 #endif
990
991         ibv_destroy_cq(rd->cq);
992         ibv_destroy_qp(rd->qp);
993
994         if (rd->is_client == 1)
995                 rdma_destroy_id(rd->cm_id);
996         else {
997                 rdma_destroy_id(rd->child_cm_id);
998                 rdma_destroy_id(rd->cm_id);
999         }
1000
1001         ibv_destroy_comp_channel(rd->channel);
1002         ibv_dealloc_pd(rd->pd);
1003
1004         return 0;
1005 }
1006
1007 static int fio_rdmaio_setup_connect(struct thread_data *td, const char *host,
1008                                     unsigned short port)
1009 {
1010         struct rdmaio_data *rd = td->io_ops_data;
1011         struct ibv_recv_wr *bad_wr;
1012         int err;
1013
1014         rd->addr.sin_family = AF_INET;
1015         rd->addr.sin_port = htons(port);
1016
1017         if (inet_aton(host, &rd->addr.sin_addr) != 1) {
1018                 struct hostent *hent;
1019
1020                 hent = gethostbyname(host);
1021                 if (!hent) {
1022                         td_verror(td, errno, "gethostbyname");
1023                         return 1;
1024                 }
1025
1026                 memcpy(&rd->addr.sin_addr, hent->h_addr, 4);
1027         }
1028
1029         /* resolve route */
1030         err = rdma_resolve_addr(rd->cm_id, NULL, (struct sockaddr *)&rd->addr, 2000);
1031         if (err != 0) {
1032                 log_err("fio: rdma_resolve_addr: %d\n", err);
1033                 return 1;
1034         }
1035
1036         err = get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED);
1037         if (err != 0) {
1038                 log_err("fio: get_next_channel_event: %d\n", err);
1039                 return 1;
1040         }
1041
1042         /* resolve route */
1043         err = rdma_resolve_route(rd->cm_id, 2000);
1044         if (err != 0) {
1045                 log_err("fio: rdma_resolve_route: %d\n", err);
1046                 return 1;
1047         }
1048
1049         err = get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED);
1050         if (err != 0) {
1051                 log_err("fio: get_next_channel_event: %d\n", err);
1052                 return 1;
1053         }
1054
1055         /* create qp and buffer */
1056         if (fio_rdmaio_setup_qp(td) != 0)
1057                 return 1;
1058
1059         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
1060                 return 1;
1061
1062         /* post recv buf */
1063         err = ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr);
1064         if (err != 0) {
1065                 log_err("fio: ibv_post_recv fail: %d\n", err);
1066                 return 1;
1067         }
1068
1069         return 0;
1070 }
1071
1072 static int fio_rdmaio_setup_listen(struct thread_data *td, short port)
1073 {
1074         struct rdmaio_data *rd = td->io_ops_data;
1075         struct ibv_recv_wr *bad_wr;
1076         int state = td->runstate;
1077
1078         td_set_runstate(td, TD_SETTING_UP);
1079
1080         rd->addr.sin_family = AF_INET;
1081         rd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
1082         rd->addr.sin_port = htons(port);
1083
1084         /* rdma_listen */
1085         if (rdma_bind_addr(rd->cm_id, (struct sockaddr *)&rd->addr) != 0) {
1086                 log_err("fio: rdma_bind_addr fail: %m\n");
1087                 return 1;
1088         }
1089
1090         if (rdma_listen(rd->cm_id, 3) != 0) {
1091                 log_err("fio: rdma_listen fail: %m\n");
1092                 return 1;
1093         }
1094
1095         log_info("fio: waiting for connection\n");
1096
1097         /* wait for CONNECT_REQUEST */
1098         if (get_next_channel_event
1099             (td, rd->cm_channel, RDMA_CM_EVENT_CONNECT_REQUEST) != 0) {
1100                 log_err("fio: wait for RDMA_CM_EVENT_CONNECT_REQUEST\n");
1101                 return 1;
1102         }
1103
1104         if (fio_rdmaio_setup_qp(td) != 0)
1105                 return 1;
1106
1107         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
1108                 return 1;
1109
1110         /* post recv buf */
1111         if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
1112                 log_err("fio: ibv_post_recv fail: %m\n");
1113                 return 1;
1114         }
1115
1116         td_set_runstate(td, state);
1117         return 0;
1118 }
1119
1120 static int check_set_rlimits(struct thread_data *td)
1121 {
1122 #ifdef CONFIG_RLIMIT_MEMLOCK
1123         struct rlimit rl;
1124
1125         /* check RLIMIT_MEMLOCK */
1126         if (getrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1127                 log_err("fio: getrlimit fail: %d(%s)\n",
1128                         errno, strerror(errno));
1129                 return 1;
1130         }
1131
1132         /* soft limit */
1133         if ((rl.rlim_cur != RLIM_INFINITY)
1134             && (rl.rlim_cur < td->orig_buffer_size)) {
1135                 log_err("fio: soft RLIMIT_MEMLOCK is: %" PRId64 "\n",
1136                         rl.rlim_cur);
1137                 log_err("fio: total block size is:    %zd\n",
1138                         td->orig_buffer_size);
1139                 /* try to set larger RLIMIT_MEMLOCK */
1140                 rl.rlim_cur = rl.rlim_max;
1141                 if (setrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1142                         log_err("fio: setrlimit fail: %d(%s)\n",
1143                                 errno, strerror(errno));
1144                         log_err("fio: you may try enlarge MEMLOCK by root\n");
1145                         log_err("# ulimit -l unlimited\n");
1146                         return 1;
1147                 }
1148         }
1149 #endif
1150
1151         return 0;
1152 }
1153
1154 static int compat_options(struct thread_data *td)
1155 {
1156         // The original RDMA engine had an ugly / seperator
1157         // on the filename for it's options. This function
1158         // retains backwards compatibility with it.100
1159
1160         struct rdmaio_options *o = td->eo;
1161         char *modep, *portp;
1162         char *filename = td->o.filename;
1163
1164         if (!filename)
1165                 return 0;
1166
1167         portp = strchr(filename, '/');
1168         if (portp == NULL)
1169                 return 0;
1170
1171         *portp = '\0';
1172         portp++;
1173
1174         o->port = strtol(portp, NULL, 10);
1175         if (!o->port || o->port > 65535)
1176                 goto bad_host;
1177
1178         modep = strchr(portp, '/');
1179         if (modep != NULL) {
1180                 *modep = '\0';
1181                 modep++;
1182         }
1183
1184         if (modep) {
1185                 if (!strncmp("rdma_write", modep, strlen(modep)) ||
1186                     !strncmp("RDMA_WRITE", modep, strlen(modep)))
1187                         o->verb = FIO_RDMA_MEM_WRITE;
1188                 else if (!strncmp("rdma_read", modep, strlen(modep)) ||
1189                          !strncmp("RDMA_READ", modep, strlen(modep)))
1190                         o->verb = FIO_RDMA_MEM_READ;
1191                 else if (!strncmp("send", modep, strlen(modep)) ||
1192                          !strncmp("SEND", modep, strlen(modep)))
1193                         o->verb = FIO_RDMA_CHA_SEND;
1194                 else
1195                         goto bad_host;
1196         } else
1197                 o->verb = FIO_RDMA_MEM_WRITE;
1198
1199
1200         return 0;
1201
1202 bad_host:
1203         log_err("fio: bad rdma host/port/protocol: %s\n", td->o.filename);
1204         return 1;
1205 }
1206
1207 static int fio_rdmaio_init(struct thread_data *td)
1208 {
1209         struct rdmaio_data *rd = td->io_ops_data;
1210         struct rdmaio_options *o = td->eo;
1211         unsigned int max_bs;
1212         int ret, i;
1213
1214         if (td_rw(td)) {
1215                 log_err("fio: rdma connections must be read OR write\n");
1216                 return 1;
1217         }
1218         if (td_random(td)) {
1219                 log_err("fio: RDMA network IO can't be random\n");
1220                 return 1;
1221         }
1222
1223         if (compat_options(td))
1224                 return 1;
1225
1226         if (!o->port) {
1227                 log_err("fio: no port has been specified which is required "
1228                         "for the rdma engine\n");
1229                 return 1;
1230         }
1231
1232         if (check_set_rlimits(td))
1233                 return 1;
1234
1235         rd->rdma_protocol = o->verb;
1236         rd->cq_event_num = 0;
1237
1238         rd->cm_channel = rdma_create_event_channel();
1239         if (!rd->cm_channel) {
1240                 log_err("fio: rdma_create_event_channel fail: %m\n");
1241                 return 1;
1242         }
1243
1244         ret = rdma_create_id(rd->cm_channel, &rd->cm_id, rd, RDMA_PS_TCP);
1245         if (ret) {
1246                 log_err("fio: rdma_create_id fail: %m\n");
1247                 return 1;
1248         }
1249
1250         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
1251             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
1252                 rd->rmt_us =
1253                         malloc(FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1254                 memset(rd->rmt_us, 0,
1255                         FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1256                 rd->rmt_nr = 0;
1257         }
1258
1259         rd->io_us_queued = malloc(td->o.iodepth * sizeof(struct io_u *));
1260         memset(rd->io_us_queued, 0, td->o.iodepth * sizeof(struct io_u *));
1261         rd->io_u_queued_nr = 0;
1262
1263         rd->io_us_flight = malloc(td->o.iodepth * sizeof(struct io_u *));
1264         memset(rd->io_us_flight, 0, td->o.iodepth * sizeof(struct io_u *));
1265         rd->io_u_flight_nr = 0;
1266
1267         rd->io_us_completed = malloc(td->o.iodepth * sizeof(struct io_u *));
1268         memset(rd->io_us_completed, 0, td->o.iodepth * sizeof(struct io_u *));
1269         rd->io_u_completed_nr = 0;
1270
1271         if (td_read(td)) {      /* READ as the server */
1272                 rd->is_client = 0;
1273                 td->flags |= TD_F_NO_PROGRESS;
1274                 /* server rd->rdma_buf_len will be setup after got request */
1275                 ret = fio_rdmaio_setup_listen(td, o->port);
1276         } else {                /* WRITE as the client */
1277                 rd->is_client = 1;
1278                 ret = fio_rdmaio_setup_connect(td, td->o.filename, o->port);
1279         }
1280
1281         max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
1282         rd->send_buf.max_bs = htonl(max_bs);
1283
1284         /* register each io_u in the free list */
1285         for (i = 0; i < td->io_u_freelist.nr; i++) {
1286                 struct io_u *io_u = td->io_u_freelist.io_us[i];
1287
1288                 io_u->engine_data = malloc(sizeof(struct rdma_io_u_data));
1289                 memset(io_u->engine_data, 0, sizeof(struct rdma_io_u_data));
1290                 ((struct rdma_io_u_data *)io_u->engine_data)->wr_id = i;
1291
1292                 io_u->mr = ibv_reg_mr(rd->pd, io_u->buf, max_bs,
1293                                       IBV_ACCESS_LOCAL_WRITE |
1294                                       IBV_ACCESS_REMOTE_READ |
1295                                       IBV_ACCESS_REMOTE_WRITE);
1296                 if (io_u->mr == NULL) {
1297                         log_err("fio: ibv_reg_mr io_u failed: %m\n");
1298                         return 1;
1299                 }
1300
1301                 rd->send_buf.rmt_us[i].buf =
1302                     cpu_to_be64((uint64_t) (unsigned long)io_u->buf);
1303                 rd->send_buf.rmt_us[i].rkey = htonl(io_u->mr->rkey);
1304                 rd->send_buf.rmt_us[i].size = htonl(max_bs);
1305
1306 #if 0
1307                 log_info("fio: Send rkey %x addr %" PRIx64 " len %d to client\n", io_u->mr->rkey, io_u->buf, max_bs); */
1308 #endif
1309         }
1310
1311         rd->send_buf.nr = htonl(i);
1312
1313         return ret;
1314 }
1315
1316 static void fio_rdmaio_cleanup(struct thread_data *td)
1317 {
1318         struct rdmaio_data *rd = td->io_ops_data;
1319
1320         if (rd)
1321                 free(rd);
1322 }
1323
1324 static int fio_rdmaio_setup(struct thread_data *td)
1325 {
1326         struct rdmaio_data *rd;
1327
1328         if (!td->files_index) {
1329                 add_file(td, td->o.filename ?: "rdma", 0, 0);
1330                 td->o.nr_files = td->o.nr_files ?: 1;
1331                 td->o.open_files++;
1332         }
1333
1334         if (!td->io_ops_data) {
1335                 rd = malloc(sizeof(*rd));
1336
1337                 memset(rd, 0, sizeof(*rd));
1338                 init_rand_seed(&rd->rand_state, (unsigned int) GOLDEN_RATIO_PRIME, 0);
1339                 td->io_ops_data = rd;
1340         }
1341
1342         return 0;
1343 }
1344
1345 static struct ioengine_ops ioengine_rw = {
1346         .name                   = "rdma",
1347         .version                = FIO_IOOPS_VERSION,
1348         .setup                  = fio_rdmaio_setup,
1349         .init                   = fio_rdmaio_init,
1350         .prep                   = fio_rdmaio_prep,
1351         .queue                  = fio_rdmaio_queue,
1352         .commit                 = fio_rdmaio_commit,
1353         .getevents              = fio_rdmaio_getevents,
1354         .event                  = fio_rdmaio_event,
1355         .cleanup                = fio_rdmaio_cleanup,
1356         .open_file              = fio_rdmaio_open_file,
1357         .close_file             = fio_rdmaio_close_file,
1358         .flags                  = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO,
1359         .options                = options,
1360         .option_struct_size     = sizeof(struct rdmaio_options),
1361 };
1362
1363 static void fio_init fio_rdmaio_register(void)
1364 {
1365         register_ioengine(&ioengine_rw);
1366 }
1367
1368 static void fio_exit fio_rdmaio_unregister(void)
1369 {
1370         unregister_ioengine(&ioengine_rw);
1371 }