hash: cleanups
[fio.git] / engines / rdma.c
1 /*
2  * RDMA I/O engine
3  *
4  * RDMA I/O engine based on the IB verbs and RDMA/CM user space libraries.
5  * Supports both RDMA memory semantics and channel semantics
6  *   for the InfiniBand, RoCE and iWARP protocols.
7  *
8  * You will need the Linux RDMA software installed, either
9  * from your Linux distributor or directly from openfabrics.org:
10  *
11  * http://www.openfabrics.org/downloads/OFED/
12  *
13  * Exchanging steps of RDMA ioengine control messages:
14  *      1. client side sends test mode (RDMA_WRITE/RDMA_READ/SEND)
15  *         to server side.
16  *      2. server side parses test mode, and sends back confirmation
17  *         to client side. In RDMA WRITE/READ test, this confirmation
18  *         includes memory information, such as rkey, address.
19  *      3. client side initiates test loop.
20  *      4. In RDMA WRITE/READ test, client side sends a completion
21  *         notification to server side. Server side updates its
22  *         td->done as true.
23  *
24  */
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
32 #include <netdb.h>
33 #include <poll.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <sys/time.h>
37 #include <sys/resource.h>
38
39 #include <pthread.h>
40 #include <inttypes.h>
41
42 #include "../fio.h"
43 #include "../hash.h"
44 #include "../optgroup.h"
45
46 #include <rdma/rdma_cma.h>
47
48 #define FIO_RDMA_MAX_IO_DEPTH    512
49
50 enum rdma_io_mode {
51         FIO_RDMA_UNKNOWN = 0,
52         FIO_RDMA_MEM_WRITE,
53         FIO_RDMA_MEM_READ,
54         FIO_RDMA_CHA_SEND,
55         FIO_RDMA_CHA_RECV
56 };
57
58 struct rdmaio_options {
59         struct thread_data *td;
60         unsigned int port;
61         enum rdma_io_mode verb;
62         char *bindname;
63 };
64
65 static int str_hostname_cb(void *data, const char *input)
66 {
67         struct rdmaio_options *o = data;
68
69         if (o->td->o.filename)
70                 free(o->td->o.filename);
71         o->td->o.filename = strdup(input);
72         return 0;
73 }
74
75 static struct fio_option options[] = {
76         {
77                 .name   = "hostname",
78                 .lname  = "rdma engine hostname",
79                 .type   = FIO_OPT_STR_STORE,
80                 .cb     = str_hostname_cb,
81                 .help   = "Hostname for RDMA IO engine",
82                 .category = FIO_OPT_C_ENGINE,
83                 .group  = FIO_OPT_G_RDMA,
84         },
85         {
86                 .name   = "bindname",
87                 .lname  = "rdma engine bindname",
88                 .type   = FIO_OPT_STR_STORE,
89                 .off1   = offsetof(struct rdmaio_options, bindname),
90                 .help   = "Bind for RDMA IO engine",
91                 .def    = "",
92                 .category = FIO_OPT_C_ENGINE,
93                 .group  = FIO_OPT_G_RDMA,
94         },
95         {
96                 .name   = "port",
97                 .lname  = "rdma engine port",
98                 .type   = FIO_OPT_INT,
99                 .off1   = offsetof(struct rdmaio_options, port),
100                 .minval = 1,
101                 .maxval = 65535,
102                 .help   = "Port to use for RDMA connections",
103                 .category = FIO_OPT_C_ENGINE,
104                 .group  = FIO_OPT_G_RDMA,
105         },
106         {
107                 .name   = "verb",
108                 .lname  = "RDMA engine verb",
109                 .alias  = "proto",
110                 .type   = FIO_OPT_STR,
111                 .off1   = offsetof(struct rdmaio_options, verb),
112                 .help   = "RDMA engine verb",
113                 .def    = "write",
114                 .posval = {
115                           { .ival = "write",
116                             .oval = FIO_RDMA_MEM_WRITE,
117                             .help = "Memory Write",
118                           },
119                           { .ival = "read",
120                             .oval = FIO_RDMA_MEM_READ,
121                             .help = "Memory Read",
122                           },
123                           { .ival = "send",
124                             .oval = FIO_RDMA_CHA_SEND,
125                             .help = "Posted Send",
126                           },
127                           { .ival = "recv",
128                             .oval = FIO_RDMA_CHA_RECV,
129                             .help = "Posted Receive",
130                           },
131                 },
132                 .category = FIO_OPT_C_ENGINE,
133                 .group  = FIO_OPT_G_RDMA,
134         },
135         {
136                 .name   = NULL,
137         },
138 };
139
140 struct remote_u {
141         uint64_t buf;
142         uint32_t rkey;
143         uint32_t size;
144 };
145
146 struct rdma_info_blk {
147         uint32_t mode;          /* channel semantic or memory semantic */
148         uint32_t nr;            /* client: io depth
149                                    server: number of records for memory semantic
150                                  */
151         uint32_t max_bs;        /* maximum block size */
152         struct remote_u rmt_us[FIO_RDMA_MAX_IO_DEPTH];
153 };
154
155 struct rdma_io_u_data {
156         uint64_t wr_id;
157         struct ibv_send_wr sq_wr;
158         struct ibv_recv_wr rq_wr;
159         struct ibv_sge rdma_sgl;
160 };
161
162 struct rdmaio_data {
163         int is_client;
164         enum rdma_io_mode rdma_protocol;
165         char host[64];
166         struct sockaddr_in addr;
167
168         struct ibv_recv_wr rq_wr;
169         struct ibv_sge recv_sgl;
170         struct rdma_info_blk recv_buf;
171         struct ibv_mr *recv_mr;
172
173         struct ibv_send_wr sq_wr;
174         struct ibv_sge send_sgl;
175         struct rdma_info_blk send_buf;
176         struct ibv_mr *send_mr;
177
178         struct ibv_comp_channel *channel;
179         struct ibv_cq *cq;
180         struct ibv_pd *pd;
181         struct ibv_qp *qp;
182
183         pthread_t cmthread;
184         struct rdma_event_channel *cm_channel;
185         struct rdma_cm_id *cm_id;
186         struct rdma_cm_id *child_cm_id;
187
188         int cq_event_num;
189
190         struct remote_u *rmt_us;
191         int rmt_nr;
192         struct io_u **io_us_queued;
193         int io_u_queued_nr;
194         struct io_u **io_us_flight;
195         int io_u_flight_nr;
196         struct io_u **io_us_completed;
197         int io_u_completed_nr;
198
199         struct frand_state rand_state;
200 };
201
202 static int client_recv(struct thread_data *td, struct ibv_wc *wc)
203 {
204         struct rdmaio_data *rd = td->io_ops_data;
205         unsigned int max_bs;
206
207         if (wc->byte_len != sizeof(rd->recv_buf)) {
208                 log_err("Received bogus data, size %d\n", wc->byte_len);
209                 return 1;
210         }
211
212         max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
213         if (max_bs > ntohl(rd->recv_buf.max_bs)) {
214                 log_err("fio: Server's block size (%d) must be greater than or "
215                         "equal to the client's block size (%d)!\n",
216                         ntohl(rd->recv_buf.max_bs), max_bs);
217                 return 1;
218         }
219
220         /* store mr info for MEMORY semantic */
221         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
222             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
223                 /* struct flist_head *entry; */
224                 int i = 0;
225
226                 rd->rmt_nr = ntohl(rd->recv_buf.nr);
227
228                 for (i = 0; i < rd->rmt_nr; i++) {
229                         rd->rmt_us[i].buf = __be64_to_cpu(
230                                                 rd->recv_buf.rmt_us[i].buf);
231                         rd->rmt_us[i].rkey = ntohl(rd->recv_buf.rmt_us[i].rkey);
232                         rd->rmt_us[i].size = ntohl(rd->recv_buf.rmt_us[i].size);
233
234                         dprint(FD_IO,
235                                "fio: Received rkey %x addr %" PRIx64
236                                " len %d from peer\n", rd->rmt_us[i].rkey,
237                                rd->rmt_us[i].buf, rd->rmt_us[i].size);
238                 }
239         }
240
241         return 0;
242 }
243
244 static int server_recv(struct thread_data *td, struct ibv_wc *wc)
245 {
246         struct rdmaio_data *rd = td->io_ops_data;
247         unsigned int max_bs;
248
249         if (wc->wr_id == FIO_RDMA_MAX_IO_DEPTH) {
250                 rd->rdma_protocol = ntohl(rd->recv_buf.mode);
251
252                 /* CHANNEL semantic, do nothing */
253                 if (rd->rdma_protocol == FIO_RDMA_CHA_SEND)
254                         rd->rdma_protocol = FIO_RDMA_CHA_RECV;
255
256                 max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
257                 if (max_bs < ntohl(rd->recv_buf.max_bs)) {
258                         log_err("fio: Server's block size (%d) must be greater than or "
259                                 "equal to the client's block size (%d)!\n",
260                                 ntohl(rd->recv_buf.max_bs), max_bs);
261                         return 1;
262                 }
263
264         }
265
266         return 0;
267 }
268
269 static int cq_event_handler(struct thread_data *td, enum ibv_wc_opcode opcode)
270 {
271         struct rdmaio_data *rd = td->io_ops_data;
272         struct ibv_wc wc;
273         struct rdma_io_u_data *r_io_u_d;
274         int ret;
275         int compevnum = 0;
276         int i;
277
278         while ((ret = ibv_poll_cq(rd->cq, 1, &wc)) == 1) {
279                 ret = 0;
280                 compevnum++;
281
282                 if (wc.status) {
283                         log_err("fio: cq completion status %d(%s)\n",
284                                 wc.status, ibv_wc_status_str(wc.status));
285                         return -1;
286                 }
287
288                 switch (wc.opcode) {
289
290                 case IBV_WC_RECV:
291                         if (rd->is_client == 1)
292                                 ret = client_recv(td, &wc);
293                         else
294                                 ret = server_recv(td, &wc);
295
296                         if (ret)
297                                 return -1;
298
299                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
300                                 break;
301
302                         for (i = 0; i < rd->io_u_flight_nr; i++) {
303                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
304
305                                 if (wc.wr_id == r_io_u_d->rq_wr.wr_id) {
306                                         rd->io_us_flight[i]->resid =
307                                             rd->io_us_flight[i]->buflen
308                                             - wc.byte_len;
309
310                                         rd->io_us_flight[i]->error = 0;
311
312                                         rd->io_us_completed[rd->
313                                                             io_u_completed_nr]
314                                             = rd->io_us_flight[i];
315                                         rd->io_u_completed_nr++;
316                                         break;
317                                 }
318                         }
319                         if (i == rd->io_u_flight_nr)
320                                 log_err("fio: recv wr %" PRId64 " not found\n",
321                                         wc.wr_id);
322                         else {
323                                 /* put the last one into middle of the list */
324                                 rd->io_us_flight[i] =
325                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
326                                 rd->io_u_flight_nr--;
327                         }
328
329                         break;
330
331                 case IBV_WC_SEND:
332                 case IBV_WC_RDMA_WRITE:
333                 case IBV_WC_RDMA_READ:
334                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
335                                 break;
336
337                         for (i = 0; i < rd->io_u_flight_nr; i++) {
338                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
339
340                                 if (wc.wr_id == r_io_u_d->sq_wr.wr_id) {
341                                         rd->io_us_completed[rd->
342                                                             io_u_completed_nr]
343                                             = rd->io_us_flight[i];
344                                         rd->io_u_completed_nr++;
345                                         break;
346                                 }
347                         }
348                         if (i == rd->io_u_flight_nr)
349                                 log_err("fio: send wr %" PRId64 " not found\n",
350                                         wc.wr_id);
351                         else {
352                                 /* put the last one into middle of the list */
353                                 rd->io_us_flight[i] =
354                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
355                                 rd->io_u_flight_nr--;
356                         }
357
358                         break;
359
360                 default:
361                         log_info("fio: unknown completion event %d\n",
362                                  wc.opcode);
363                         return -1;
364                 }
365                 rd->cq_event_num++;
366         }
367
368         if (ret) {
369                 log_err("fio: poll error %d\n", ret);
370                 return 1;
371         }
372
373         return compevnum;
374 }
375
376 /*
377  * Return -1 for error and 'nr events' for a positive number
378  * of events
379  */
380 static int rdma_poll_wait(struct thread_data *td, enum ibv_wc_opcode opcode)
381 {
382         struct rdmaio_data *rd = td->io_ops_data;
383         struct ibv_cq *ev_cq;
384         void *ev_ctx;
385         int ret;
386
387         if (rd->cq_event_num > 0) {     /* previous left */
388                 rd->cq_event_num--;
389                 return 0;
390         }
391
392 again:
393         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
394                 log_err("fio: Failed to get cq event!\n");
395                 return -1;
396         }
397         if (ev_cq != rd->cq) {
398                 log_err("fio: Unknown CQ!\n");
399                 return -1;
400         }
401         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
402                 log_err("fio: Failed to set notify!\n");
403                 return -1;
404         }
405
406         ret = cq_event_handler(td, opcode);
407         if (ret == 0)
408                 goto again;
409
410         ibv_ack_cq_events(rd->cq, ret);
411
412         rd->cq_event_num--;
413
414         return ret;
415 }
416
417 static int fio_rdmaio_setup_qp(struct thread_data *td)
418 {
419         struct rdmaio_data *rd = td->io_ops_data;
420         struct ibv_qp_init_attr init_attr;
421         int qp_depth = td->o.iodepth * 2;       /* 2 times of io depth */
422
423         if (rd->is_client == 0)
424                 rd->pd = ibv_alloc_pd(rd->child_cm_id->verbs);
425         else
426                 rd->pd = ibv_alloc_pd(rd->cm_id->verbs);
427
428         if (rd->pd == NULL) {
429                 log_err("fio: ibv_alloc_pd fail: %m\n");
430                 return 1;
431         }
432
433         if (rd->is_client == 0)
434                 rd->channel = ibv_create_comp_channel(rd->child_cm_id->verbs);
435         else
436                 rd->channel = ibv_create_comp_channel(rd->cm_id->verbs);
437         if (rd->channel == NULL) {
438                 log_err("fio: ibv_create_comp_channel fail: %m\n");
439                 goto err1;
440         }
441
442         if (qp_depth < 16)
443                 qp_depth = 16;
444
445         if (rd->is_client == 0)
446                 rd->cq = ibv_create_cq(rd->child_cm_id->verbs,
447                                        qp_depth, rd, rd->channel, 0);
448         else
449                 rd->cq = ibv_create_cq(rd->cm_id->verbs,
450                                        qp_depth, rd, rd->channel, 0);
451         if (rd->cq == NULL) {
452                 log_err("fio: ibv_create_cq failed: %m\n");
453                 goto err2;
454         }
455
456         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
457                 log_err("fio: ibv_req_notify_cq failed: %m\n");
458                 goto err3;
459         }
460
461         /* create queue pair */
462         memset(&init_attr, 0, sizeof(init_attr));
463         init_attr.cap.max_send_wr = qp_depth;
464         init_attr.cap.max_recv_wr = qp_depth;
465         init_attr.cap.max_recv_sge = 1;
466         init_attr.cap.max_send_sge = 1;
467         init_attr.qp_type = IBV_QPT_RC;
468         init_attr.send_cq = rd->cq;
469         init_attr.recv_cq = rd->cq;
470
471         if (rd->is_client == 0) {
472                 if (rdma_create_qp(rd->child_cm_id, rd->pd, &init_attr) != 0) {
473                         log_err("fio: rdma_create_qp failed: %m\n");
474                         goto err3;
475                 }
476                 rd->qp = rd->child_cm_id->qp;
477         } else {
478                 if (rdma_create_qp(rd->cm_id, rd->pd, &init_attr) != 0) {
479                         log_err("fio: rdma_create_qp failed: %m\n");
480                         goto err3;
481                 }
482                 rd->qp = rd->cm_id->qp;
483         }
484
485         return 0;
486
487 err3:
488         ibv_destroy_cq(rd->cq);
489 err2:
490         ibv_destroy_comp_channel(rd->channel);
491 err1:
492         ibv_dealloc_pd(rd->pd);
493
494         return 1;
495 }
496
497 static int fio_rdmaio_setup_control_msg_buffers(struct thread_data *td)
498 {
499         struct rdmaio_data *rd = td->io_ops_data;
500
501         rd->recv_mr = ibv_reg_mr(rd->pd, &rd->recv_buf, sizeof(rd->recv_buf),
502                                  IBV_ACCESS_LOCAL_WRITE);
503         if (rd->recv_mr == NULL) {
504                 log_err("fio: recv_buf reg_mr failed: %m\n");
505                 return 1;
506         }
507
508         rd->send_mr = ibv_reg_mr(rd->pd, &rd->send_buf, sizeof(rd->send_buf),
509                                  0);
510         if (rd->send_mr == NULL) {
511                 log_err("fio: send_buf reg_mr failed: %m\n");
512                 ibv_dereg_mr(rd->recv_mr);
513                 return 1;
514         }
515
516         /* setup work request */
517         /* recv wq */
518         rd->recv_sgl.addr = (uint64_t) (unsigned long)&rd->recv_buf;
519         rd->recv_sgl.length = sizeof(rd->recv_buf);
520         rd->recv_sgl.lkey = rd->recv_mr->lkey;
521         rd->rq_wr.sg_list = &rd->recv_sgl;
522         rd->rq_wr.num_sge = 1;
523         rd->rq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
524
525         /* send wq */
526         rd->send_sgl.addr = (uint64_t) (unsigned long)&rd->send_buf;
527         rd->send_sgl.length = sizeof(rd->send_buf);
528         rd->send_sgl.lkey = rd->send_mr->lkey;
529
530         rd->sq_wr.opcode = IBV_WR_SEND;
531         rd->sq_wr.send_flags = IBV_SEND_SIGNALED;
532         rd->sq_wr.sg_list = &rd->send_sgl;
533         rd->sq_wr.num_sge = 1;
534         rd->sq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
535
536         return 0;
537 }
538
539 static int get_next_channel_event(struct thread_data *td,
540                                   struct rdma_event_channel *channel,
541                                   enum rdma_cm_event_type wait_event)
542 {
543         struct rdmaio_data *rd = td->io_ops_data;
544         struct rdma_cm_event *event;
545         int ret;
546
547         ret = rdma_get_cm_event(channel, &event);
548         if (ret) {
549                 log_err("fio: rdma_get_cm_event: %d\n", ret);
550                 return 1;
551         }
552
553         if (event->event != wait_event) {
554                 log_err("fio: event is %s instead of %s\n",
555                         rdma_event_str(event->event),
556                         rdma_event_str(wait_event));
557                 return 1;
558         }
559
560         switch (event->event) {
561         case RDMA_CM_EVENT_CONNECT_REQUEST:
562                 rd->child_cm_id = event->id;
563                 break;
564         default:
565                 break;
566         }
567
568         rdma_ack_cm_event(event);
569
570         return 0;
571 }
572
573 static int fio_rdmaio_prep(struct thread_data *td, struct io_u *io_u)
574 {
575         struct rdmaio_data *rd = td->io_ops_data;
576         struct rdma_io_u_data *r_io_u_d;
577
578         r_io_u_d = io_u->engine_data;
579
580         switch (rd->rdma_protocol) {
581         case FIO_RDMA_MEM_WRITE:
582         case FIO_RDMA_MEM_READ:
583                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
584                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
585                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
586                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
587                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
588                 r_io_u_d->sq_wr.num_sge = 1;
589                 break;
590         case FIO_RDMA_CHA_SEND:
591                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
592                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
593                 r_io_u_d->rdma_sgl.length = io_u->buflen;
594                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
595                 r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
596                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
597                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
598                 r_io_u_d->sq_wr.num_sge = 1;
599                 break;
600         case FIO_RDMA_CHA_RECV:
601                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
602                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
603                 r_io_u_d->rdma_sgl.length = io_u->buflen;
604                 r_io_u_d->rq_wr.wr_id = r_io_u_d->wr_id;
605                 r_io_u_d->rq_wr.sg_list = &r_io_u_d->rdma_sgl;
606                 r_io_u_d->rq_wr.num_sge = 1;
607                 break;
608         default:
609                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
610                 break;
611         }
612
613         return 0;
614 }
615
616 static struct io_u *fio_rdmaio_event(struct thread_data *td, int event)
617 {
618         struct rdmaio_data *rd = td->io_ops_data;
619         struct io_u *io_u;
620         int i;
621
622         io_u = rd->io_us_completed[0];
623         for (i = 0; i < rd->io_u_completed_nr - 1; i++)
624                 rd->io_us_completed[i] = rd->io_us_completed[i + 1];
625
626         rd->io_u_completed_nr--;
627
628         dprint_io_u(io_u, "fio_rdmaio_event");
629
630         return io_u;
631 }
632
633 static int fio_rdmaio_getevents(struct thread_data *td, unsigned int min,
634                                 unsigned int max, const struct timespec *t)
635 {
636         struct rdmaio_data *rd = td->io_ops_data;
637         enum ibv_wc_opcode comp_opcode;
638         struct ibv_cq *ev_cq;
639         void *ev_ctx;
640         int ret, r = 0;
641         comp_opcode = IBV_WC_RDMA_WRITE;
642
643         switch (rd->rdma_protocol) {
644         case FIO_RDMA_MEM_WRITE:
645                 comp_opcode = IBV_WC_RDMA_WRITE;
646                 break;
647         case FIO_RDMA_MEM_READ:
648                 comp_opcode = IBV_WC_RDMA_READ;
649                 break;
650         case FIO_RDMA_CHA_SEND:
651                 comp_opcode = IBV_WC_SEND;
652                 break;
653         case FIO_RDMA_CHA_RECV:
654                 comp_opcode = IBV_WC_RECV;
655                 break;
656         default:
657                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
658                 break;
659         }
660
661         if (rd->cq_event_num > 0) {     /* previous left */
662                 rd->cq_event_num--;
663                 return 0;
664         }
665
666 again:
667         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
668                 log_err("fio: Failed to get cq event!\n");
669                 return -1;
670         }
671         if (ev_cq != rd->cq) {
672                 log_err("fio: Unknown CQ!\n");
673                 return -1;
674         }
675         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
676                 log_err("fio: Failed to set notify!\n");
677                 return -1;
678         }
679
680         ret = cq_event_handler(td, comp_opcode);
681         if (ret < 1)
682                 goto again;
683
684         ibv_ack_cq_events(rd->cq, ret);
685
686         r += ret;
687         if (r < min)
688                 goto again;
689
690         rd->cq_event_num -= r;
691
692         return r;
693 }
694
695 static int fio_rdmaio_send(struct thread_data *td, struct io_u **io_us,
696                            unsigned int nr)
697 {
698         struct rdmaio_data *rd = td->io_ops_data;
699         struct ibv_send_wr *bad_wr;
700 #if 0
701         enum ibv_wc_opcode comp_opcode;
702         comp_opcode = IBV_WC_RDMA_WRITE;
703 #endif
704         int i;
705         long index;
706         struct rdma_io_u_data *r_io_u_d;
707
708         r_io_u_d = NULL;
709
710         for (i = 0; i < nr; i++) {
711                 /* RDMA_WRITE or RDMA_READ */
712                 switch (rd->rdma_protocol) {
713                 case FIO_RDMA_MEM_WRITE:
714                         /* compose work request */
715                         r_io_u_d = io_us[i]->engine_data;
716                         index = __rand(&rd->rand_state) % rd->rmt_nr;
717                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_WRITE;
718                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
719                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
720                                 rd->rmt_us[index].buf;
721                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
722                         break;
723                 case FIO_RDMA_MEM_READ:
724                         /* compose work request */
725                         r_io_u_d = io_us[i]->engine_data;
726                         index = __rand(&rd->rand_state) % rd->rmt_nr;
727                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_READ;
728                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
729                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
730                                 rd->rmt_us[index].buf;
731                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
732                         break;
733                 case FIO_RDMA_CHA_SEND:
734                         r_io_u_d = io_us[i]->engine_data;
735                         r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
736                         r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
737                         break;
738                 default:
739                         log_err("fio: unknown rdma protocol - %d\n",
740                                 rd->rdma_protocol);
741                         break;
742                 }
743
744                 if (ibv_post_send(rd->qp, &r_io_u_d->sq_wr, &bad_wr) != 0) {
745                         log_err("fio: ibv_post_send fail: %m\n");
746                         return -1;
747                 }
748
749                 dprint_io_u(io_us[i], "fio_rdmaio_send");
750         }
751
752         /* wait for completion
753            rdma_poll_wait(td, comp_opcode); */
754
755         return i;
756 }
757
758 static int fio_rdmaio_recv(struct thread_data *td, struct io_u **io_us,
759                            unsigned int nr)
760 {
761         struct rdmaio_data *rd = td->io_ops_data;
762         struct ibv_recv_wr *bad_wr;
763         struct rdma_io_u_data *r_io_u_d;
764         int i;
765
766         i = 0;
767         if (rd->rdma_protocol == FIO_RDMA_CHA_RECV) {
768                 /* post io_u into recv queue */
769                 for (i = 0; i < nr; i++) {
770                         r_io_u_d = io_us[i]->engine_data;
771                         if (ibv_post_recv(rd->qp, &r_io_u_d->rq_wr, &bad_wr) !=
772                             0) {
773                                 log_err("fio: ibv_post_recv fail: %m\n");
774                                 return 1;
775                         }
776                 }
777         } else if ((rd->rdma_protocol == FIO_RDMA_MEM_READ)
778                    || (rd->rdma_protocol == FIO_RDMA_MEM_WRITE)) {
779                 /* re-post the rq_wr */
780                 if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
781                         log_err("fio: ibv_post_recv fail: %m\n");
782                         return 1;
783                 }
784
785                 rdma_poll_wait(td, IBV_WC_RECV);
786
787                 dprint(FD_IO, "fio: recv FINISH message\n");
788                 td->done = 1;
789                 return 0;
790         }
791
792         return i;
793 }
794
795 static enum fio_q_status fio_rdmaio_queue(struct thread_data *td,
796                                           struct io_u *io_u)
797 {
798         struct rdmaio_data *rd = td->io_ops_data;
799
800         fio_ro_check(td, io_u);
801
802         if (rd->io_u_queued_nr == (int)td->o.iodepth)
803                 return FIO_Q_BUSY;
804
805         rd->io_us_queued[rd->io_u_queued_nr] = io_u;
806         rd->io_u_queued_nr++;
807
808         dprint_io_u(io_u, "fio_rdmaio_queue");
809
810         return FIO_Q_QUEUED;
811 }
812
813 static void fio_rdmaio_queued(struct thread_data *td, struct io_u **io_us,
814                               unsigned int nr)
815 {
816         struct rdmaio_data *rd = td->io_ops_data;
817         struct timespec now;
818         unsigned int i;
819
820         if (!fio_fill_issue_time(td))
821                 return;
822
823         fio_gettime(&now, NULL);
824
825         for (i = 0; i < nr; i++) {
826                 struct io_u *io_u = io_us[i];
827
828                 /* queued -> flight */
829                 rd->io_us_flight[rd->io_u_flight_nr] = io_u;
830                 rd->io_u_flight_nr++;
831
832                 memcpy(&io_u->issue_time, &now, sizeof(now));
833                 io_u_queued(td, io_u);
834         }
835
836         /*
837          * only used for iolog
838          */
839         if (td->o.read_iolog_file)
840                 memcpy(&td->last_issue, &now, sizeof(now));
841 }
842
843 static int fio_rdmaio_commit(struct thread_data *td)
844 {
845         struct rdmaio_data *rd = td->io_ops_data;
846         struct io_u **io_us;
847         int ret;
848
849         if (!rd->io_us_queued)
850                 return 0;
851
852         io_us = rd->io_us_queued;
853         do {
854                 /* RDMA_WRITE or RDMA_READ */
855                 if (rd->is_client)
856                         ret = fio_rdmaio_send(td, io_us, rd->io_u_queued_nr);
857                 else if (!rd->is_client)
858                         ret = fio_rdmaio_recv(td, io_us, rd->io_u_queued_nr);
859                 else
860                         ret = 0;        /* must be a SYNC */
861
862                 if (ret > 0) {
863                         fio_rdmaio_queued(td, io_us, ret);
864                         io_u_mark_submit(td, ret);
865                         rd->io_u_queued_nr -= ret;
866                         io_us += ret;
867                         ret = 0;
868                 } else
869                         break;
870         } while (rd->io_u_queued_nr);
871
872         return ret;
873 }
874
875 static int fio_rdmaio_connect(struct thread_data *td, struct fio_file *f)
876 {
877         struct rdmaio_data *rd = td->io_ops_data;
878         struct rdma_conn_param conn_param;
879         struct ibv_send_wr *bad_wr;
880
881         memset(&conn_param, 0, sizeof(conn_param));
882         conn_param.responder_resources = 1;
883         conn_param.initiator_depth = 1;
884         conn_param.retry_count = 10;
885
886         if (rdma_connect(rd->cm_id, &conn_param) != 0) {
887                 log_err("fio: rdma_connect fail: %m\n");
888                 return 1;
889         }
890
891         if (get_next_channel_event
892             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
893                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
894                 return 1;
895         }
896
897         /* send task request */
898         rd->send_buf.mode = htonl(rd->rdma_protocol);
899         rd->send_buf.nr = htonl(td->o.iodepth);
900
901         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
902                 log_err("fio: ibv_post_send fail: %m\n");
903                 return 1;
904         }
905
906         if (rdma_poll_wait(td, IBV_WC_SEND) < 0)
907                 return 1;
908
909         /* wait for remote MR info from server side */
910         if (rdma_poll_wait(td, IBV_WC_RECV) < 0)
911                 return 1;
912
913         /* In SEND/RECV test, it's a good practice to setup the iodepth of
914          * of the RECV side deeper than that of the SEND side to
915          * avoid RNR (receiver not ready) error. The
916          * SEND side may send so many unsolicited message before
917          * RECV side commits sufficient recv buffers into recv queue.
918          * This may lead to RNR error. Here, SEND side pauses for a while
919          * during which RECV side commits sufficient recv buffers.
920          */
921         usleep(500000);
922
923         return 0;
924 }
925
926 static int fio_rdmaio_accept(struct thread_data *td, struct fio_file *f)
927 {
928         struct rdmaio_data *rd = td->io_ops_data;
929         struct rdma_conn_param conn_param;
930         struct ibv_send_wr *bad_wr;
931         int ret = 0;
932
933         /* rdma_accept() - then wait for accept success */
934         memset(&conn_param, 0, sizeof(conn_param));
935         conn_param.responder_resources = 1;
936         conn_param.initiator_depth = 1;
937
938         if (rdma_accept(rd->child_cm_id, &conn_param) != 0) {
939                 log_err("fio: rdma_accept: %m\n");
940                 return 1;
941         }
942
943         if (get_next_channel_event
944             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
945                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
946                 return 1;
947         }
948
949         /* wait for request */
950         ret = rdma_poll_wait(td, IBV_WC_RECV) < 0;
951
952         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
953                 log_err("fio: ibv_post_send fail: %m\n");
954                 return 1;
955         }
956
957         if (rdma_poll_wait(td, IBV_WC_SEND) < 0)
958                 return 1;
959
960         return ret;
961 }
962
963 static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f)
964 {
965         if (td_read(td))
966                 return fio_rdmaio_accept(td, f);
967         else
968                 return fio_rdmaio_connect(td, f);
969 }
970
971 static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f)
972 {
973         struct rdmaio_data *rd = td->io_ops_data;
974         struct ibv_send_wr *bad_wr;
975
976         /* unregister rdma buffer */
977
978         /*
979          * Client sends notification to the server side
980          */
981         /* refer to: http://linux.die.net/man/7/rdma_cm */
982         if ((rd->is_client == 1) && ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE)
983                                      || (rd->rdma_protocol ==
984                                          FIO_RDMA_MEM_READ))) {
985                 if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
986                         log_err("fio: ibv_post_send fail: %m\n");
987                         return 1;
988                 }
989
990                 dprint(FD_IO, "fio: close information sent success\n");
991                 rdma_poll_wait(td, IBV_WC_SEND);
992         }
993
994         if (rd->is_client == 1)
995                 rdma_disconnect(rd->cm_id);
996         else {
997                 rdma_disconnect(rd->child_cm_id);
998 #if 0
999                 rdma_disconnect(rd->cm_id);
1000 #endif
1001         }
1002
1003 #if 0
1004         if (get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_DISCONNECTED) != 0) {
1005                 log_err("fio: wait for RDMA_CM_EVENT_DISCONNECTED\n");
1006                 return 1;
1007         }
1008 #endif
1009
1010         ibv_destroy_cq(rd->cq);
1011         ibv_destroy_qp(rd->qp);
1012
1013         if (rd->is_client == 1)
1014                 rdma_destroy_id(rd->cm_id);
1015         else {
1016                 rdma_destroy_id(rd->child_cm_id);
1017                 rdma_destroy_id(rd->cm_id);
1018         }
1019
1020         ibv_destroy_comp_channel(rd->channel);
1021         ibv_dealloc_pd(rd->pd);
1022
1023         return 0;
1024 }
1025
1026 static int aton(struct thread_data *td, const char *host,
1027                      struct sockaddr_in *addr)
1028 {
1029         if (inet_aton(host, &addr->sin_addr) != 1) {
1030                 struct hostent *hent;
1031
1032                 hent = gethostbyname(host);
1033                 if (!hent) {
1034                         td_verror(td, errno, "gethostbyname");
1035                         return 1;
1036                 }
1037
1038                 memcpy(&addr->sin_addr, hent->h_addr, 4);
1039         }
1040         return 0;
1041 }
1042
1043 static int fio_rdmaio_setup_connect(struct thread_data *td, const char *host,
1044                                     unsigned short port)
1045 {
1046         struct rdmaio_data *rd = td->io_ops_data;
1047         struct rdmaio_options *o = td->eo;
1048         struct sockaddr_storage addrb;
1049         struct ibv_recv_wr *bad_wr;
1050         int err;
1051
1052         rd->addr.sin_family = AF_INET;
1053         rd->addr.sin_port = htons(port);
1054
1055         err = aton(td, host, &rd->addr);
1056         if (err)
1057                 return err;
1058
1059         /* resolve route */
1060         if (o->bindname && strlen(o->bindname)) {
1061                 addrb.ss_family = AF_INET;
1062                 err = aton(td, o->bindname, (struct sockaddr_in *)&addrb);
1063                 if (err)
1064                         return err;
1065                 err = rdma_resolve_addr(rd->cm_id, (struct sockaddr *)&addrb,
1066                                         (struct sockaddr *)&rd->addr, 2000);
1067
1068         } else {
1069                 err = rdma_resolve_addr(rd->cm_id, NULL,
1070                                         (struct sockaddr *)&rd->addr, 2000);
1071         }
1072
1073         if (err != 0) {
1074                 log_err("fio: rdma_resolve_addr: %d\n", err);
1075                 return 1;
1076         }
1077
1078         err = get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED);
1079         if (err != 0) {
1080                 log_err("fio: get_next_channel_event: %d\n", err);
1081                 return 1;
1082         }
1083
1084         /* resolve route */
1085         err = rdma_resolve_route(rd->cm_id, 2000);
1086         if (err != 0) {
1087                 log_err("fio: rdma_resolve_route: %d\n", err);
1088                 return 1;
1089         }
1090
1091         err = get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED);
1092         if (err != 0) {
1093                 log_err("fio: get_next_channel_event: %d\n", err);
1094                 return 1;
1095         }
1096
1097         /* create qp and buffer */
1098         if (fio_rdmaio_setup_qp(td) != 0)
1099                 return 1;
1100
1101         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
1102                 return 1;
1103
1104         /* post recv buf */
1105         err = ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr);
1106         if (err != 0) {
1107                 log_err("fio: ibv_post_recv fail: %d\n", err);
1108                 return 1;
1109         }
1110
1111         return 0;
1112 }
1113
1114 static int fio_rdmaio_setup_listen(struct thread_data *td, short port)
1115 {
1116         struct rdmaio_data *rd = td->io_ops_data;
1117         struct rdmaio_options *o = td->eo;
1118         struct ibv_recv_wr *bad_wr;
1119         int state = td->runstate;
1120
1121         td_set_runstate(td, TD_SETTING_UP);
1122
1123         rd->addr.sin_family = AF_INET;
1124         rd->addr.sin_port = htons(port);
1125
1126         if (!o->bindname || !strlen(o->bindname))
1127                 rd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
1128         else
1129                 rd->addr.sin_addr.s_addr = htonl(*o->bindname);
1130
1131         /* rdma_listen */
1132         if (rdma_bind_addr(rd->cm_id, (struct sockaddr *)&rd->addr) != 0) {
1133                 log_err("fio: rdma_bind_addr fail: %m\n");
1134                 return 1;
1135         }
1136
1137         if (rdma_listen(rd->cm_id, 3) != 0) {
1138                 log_err("fio: rdma_listen fail: %m\n");
1139                 return 1;
1140         }
1141
1142         log_info("fio: waiting for connection\n");
1143
1144         /* wait for CONNECT_REQUEST */
1145         if (get_next_channel_event
1146             (td, rd->cm_channel, RDMA_CM_EVENT_CONNECT_REQUEST) != 0) {
1147                 log_err("fio: wait for RDMA_CM_EVENT_CONNECT_REQUEST\n");
1148                 return 1;
1149         }
1150
1151         if (fio_rdmaio_setup_qp(td) != 0)
1152                 return 1;
1153
1154         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
1155                 return 1;
1156
1157         /* post recv buf */
1158         if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
1159                 log_err("fio: ibv_post_recv fail: %m\n");
1160                 return 1;
1161         }
1162
1163         td_set_runstate(td, state);
1164         return 0;
1165 }
1166
1167 static int check_set_rlimits(struct thread_data *td)
1168 {
1169 #ifdef CONFIG_RLIMIT_MEMLOCK
1170         struct rlimit rl;
1171
1172         /* check RLIMIT_MEMLOCK */
1173         if (getrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1174                 log_err("fio: getrlimit fail: %d(%s)\n",
1175                         errno, strerror(errno));
1176                 return 1;
1177         }
1178
1179         /* soft limit */
1180         if ((rl.rlim_cur != RLIM_INFINITY)
1181             && (rl.rlim_cur < td->orig_buffer_size)) {
1182                 log_err("fio: soft RLIMIT_MEMLOCK is: %" PRId64 "\n",
1183                         rl.rlim_cur);
1184                 log_err("fio: total block size is:    %zd\n",
1185                         td->orig_buffer_size);
1186                 /* try to set larger RLIMIT_MEMLOCK */
1187                 rl.rlim_cur = rl.rlim_max;
1188                 if (setrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1189                         log_err("fio: setrlimit fail: %d(%s)\n",
1190                                 errno, strerror(errno));
1191                         log_err("fio: you may try enlarge MEMLOCK by root\n");
1192                         log_err("# ulimit -l unlimited\n");
1193                         return 1;
1194                 }
1195         }
1196 #endif
1197
1198         return 0;
1199 }
1200
1201 static int compat_options(struct thread_data *td)
1202 {
1203         // The original RDMA engine had an ugly / separator
1204         // on the filename for it's options. This function
1205         // retains backwards compatibility with it. Note we do not
1206         // support setting the bindname option is this legacy mode.
1207
1208         struct rdmaio_options *o = td->eo;
1209         char *modep, *portp;
1210         char *filename = td->o.filename;
1211
1212         if (!filename)
1213                 return 0;
1214
1215         portp = strchr(filename, '/');
1216         if (portp == NULL)
1217                 return 0;
1218
1219         *portp = '\0';
1220         portp++;
1221
1222         o->port = strtol(portp, NULL, 10);
1223         if (!o->port || o->port > 65535)
1224                 goto bad_host;
1225
1226         modep = strchr(portp, '/');
1227         if (modep != NULL) {
1228                 *modep = '\0';
1229                 modep++;
1230         }
1231
1232         if (modep) {
1233                 if (!strncmp("rdma_write", modep, strlen(modep)) ||
1234                     !strncmp("RDMA_WRITE", modep, strlen(modep)))
1235                         o->verb = FIO_RDMA_MEM_WRITE;
1236                 else if (!strncmp("rdma_read", modep, strlen(modep)) ||
1237                          !strncmp("RDMA_READ", modep, strlen(modep)))
1238                         o->verb = FIO_RDMA_MEM_READ;
1239                 else if (!strncmp("send", modep, strlen(modep)) ||
1240                          !strncmp("SEND", modep, strlen(modep)))
1241                         o->verb = FIO_RDMA_CHA_SEND;
1242                 else
1243                         goto bad_host;
1244         } else
1245                 o->verb = FIO_RDMA_MEM_WRITE;
1246
1247
1248         return 0;
1249
1250 bad_host:
1251         log_err("fio: bad rdma host/port/protocol: %s\n", td->o.filename);
1252         return 1;
1253 }
1254
1255 static int fio_rdmaio_init(struct thread_data *td)
1256 {
1257         struct rdmaio_data *rd = td->io_ops_data;
1258         struct rdmaio_options *o = td->eo;
1259         int ret;
1260
1261         if (td_rw(td)) {
1262                 log_err("fio: rdma connections must be read OR write\n");
1263                 return 1;
1264         }
1265         if (td_random(td)) {
1266                 log_err("fio: RDMA network IO can't be random\n");
1267                 return 1;
1268         }
1269
1270         if (compat_options(td))
1271                 return 1;
1272
1273         if (!o->port) {
1274                 log_err("fio: no port has been specified which is required "
1275                         "for the rdma engine\n");
1276                 return 1;
1277         }
1278
1279         if (check_set_rlimits(td))
1280                 return 1;
1281
1282         rd->rdma_protocol = o->verb;
1283         rd->cq_event_num = 0;
1284
1285         rd->cm_channel = rdma_create_event_channel();
1286         if (!rd->cm_channel) {
1287                 log_err("fio: rdma_create_event_channel fail: %m\n");
1288                 return 1;
1289         }
1290
1291         ret = rdma_create_id(rd->cm_channel, &rd->cm_id, rd, RDMA_PS_TCP);
1292         if (ret) {
1293                 log_err("fio: rdma_create_id fail: %m\n");
1294                 return 1;
1295         }
1296
1297         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
1298             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
1299                 rd->rmt_us =
1300                         malloc(FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1301                 memset(rd->rmt_us, 0,
1302                         FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1303                 rd->rmt_nr = 0;
1304         }
1305
1306         rd->io_us_queued = malloc(td->o.iodepth * sizeof(struct io_u *));
1307         memset(rd->io_us_queued, 0, td->o.iodepth * sizeof(struct io_u *));
1308         rd->io_u_queued_nr = 0;
1309
1310         rd->io_us_flight = malloc(td->o.iodepth * sizeof(struct io_u *));
1311         memset(rd->io_us_flight, 0, td->o.iodepth * sizeof(struct io_u *));
1312         rd->io_u_flight_nr = 0;
1313
1314         rd->io_us_completed = malloc(td->o.iodepth * sizeof(struct io_u *));
1315         memset(rd->io_us_completed, 0, td->o.iodepth * sizeof(struct io_u *));
1316         rd->io_u_completed_nr = 0;
1317
1318         if (td_read(td)) {      /* READ as the server */
1319                 rd->is_client = 0;
1320                 td->flags |= TD_F_NO_PROGRESS;
1321                 /* server rd->rdma_buf_len will be setup after got request */
1322                 ret = fio_rdmaio_setup_listen(td, o->port);
1323         } else {                /* WRITE as the client */
1324                 rd->is_client = 1;
1325                 ret = fio_rdmaio_setup_connect(td, td->o.filename, o->port);
1326         }
1327         return ret;
1328 }
1329 static int fio_rdmaio_post_init(struct thread_data *td)
1330 {
1331         unsigned int max_bs;
1332         int i;
1333         struct rdmaio_data *rd = td->io_ops_data;
1334
1335         max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
1336         rd->send_buf.max_bs = htonl(max_bs);
1337
1338         /* register each io_u in the free list */
1339         for (i = 0; i < td->io_u_freelist.nr; i++) {
1340                 struct io_u *io_u = td->io_u_freelist.io_us[i];
1341
1342                 io_u->engine_data = malloc(sizeof(struct rdma_io_u_data));
1343                 memset(io_u->engine_data, 0, sizeof(struct rdma_io_u_data));
1344                 ((struct rdma_io_u_data *)io_u->engine_data)->wr_id = i;
1345
1346                 io_u->mr = ibv_reg_mr(rd->pd, io_u->buf, max_bs,
1347                                       IBV_ACCESS_LOCAL_WRITE |
1348                                       IBV_ACCESS_REMOTE_READ |
1349                                       IBV_ACCESS_REMOTE_WRITE);
1350                 if (io_u->mr == NULL) {
1351                         log_err("fio: ibv_reg_mr io_u failed: %m\n");
1352                         return 1;
1353                 }
1354
1355                 rd->send_buf.rmt_us[i].buf =
1356                     cpu_to_be64((uint64_t) (unsigned long)io_u->buf);
1357                 rd->send_buf.rmt_us[i].rkey = htonl(io_u->mr->rkey);
1358                 rd->send_buf.rmt_us[i].size = htonl(max_bs);
1359
1360 #if 0
1361                 log_info("fio: Send rkey %x addr %" PRIx64 " len %d to client\n", io_u->mr->rkey, io_u->buf, max_bs); */
1362 #endif
1363         }
1364
1365         rd->send_buf.nr = htonl(i);
1366
1367         return 0;
1368 }
1369
1370 static void fio_rdmaio_cleanup(struct thread_data *td)
1371 {
1372         struct rdmaio_data *rd = td->io_ops_data;
1373
1374         if (rd)
1375                 free(rd);
1376 }
1377
1378 static int fio_rdmaio_setup(struct thread_data *td)
1379 {
1380         struct rdmaio_data *rd;
1381
1382         if (!td->files_index) {
1383                 add_file(td, td->o.filename ?: "rdma", 0, 0);
1384                 td->o.nr_files = td->o.nr_files ?: 1;
1385                 td->o.open_files++;
1386         }
1387
1388         if (!td->io_ops_data) {
1389                 rd = malloc(sizeof(*rd));
1390
1391                 memset(rd, 0, sizeof(*rd));
1392                 init_rand_seed(&rd->rand_state, (unsigned int) GOLDEN_RATIO_64, 0);
1393                 td->io_ops_data = rd;
1394         }
1395
1396         return 0;
1397 }
1398
1399 FIO_STATIC struct ioengine_ops ioengine = {
1400         .name                   = "rdma",
1401         .version                = FIO_IOOPS_VERSION,
1402         .setup                  = fio_rdmaio_setup,
1403         .init                   = fio_rdmaio_init,
1404         .post_init              = fio_rdmaio_post_init,
1405         .prep                   = fio_rdmaio_prep,
1406         .queue                  = fio_rdmaio_queue,
1407         .commit                 = fio_rdmaio_commit,
1408         .getevents              = fio_rdmaio_getevents,
1409         .event                  = fio_rdmaio_event,
1410         .cleanup                = fio_rdmaio_cleanup,
1411         .open_file              = fio_rdmaio_open_file,
1412         .close_file             = fio_rdmaio_close_file,
1413         .flags                  = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO |
1414                                         FIO_ASYNCIO_SETS_ISSUE_TIME,
1415         .options                = options,
1416         .option_struct_size     = sizeof(struct rdmaio_options),
1417 };
1418
1419 static void fio_init fio_rdmaio_register(void)
1420 {
1421         register_ioengine(&ioengine);
1422 }
1423
1424 static void fio_exit fio_rdmaio_unregister(void)
1425 {
1426         unregister_ioengine(&ioengine);
1427 }