buffer: only set refill_buffers, it it wasn't set manually
[fio.git] / engines / rdma.c
1 /*
2  * RDMA I/O engine
3  *
4  * RDMA I/O engine based on the IB verbs and RDMA/CM user space libraries.
5  * Supports both RDMA memory semantics and channel semantics
6  *   for the InfiniBand, RoCE and iWARP protocols.
7  *
8  * You will need the Linux RDMA software installed, either
9  * from your Linux distributor or directly from openfabrics.org:
10  *
11  * http://www.openfabrics.org/downloads/OFED/
12  *
13  * Exchanging steps of RDMA ioengine control messages:
14  *      1. client side sends test mode (RDMA_WRITE/RDMA_READ/SEND)
15  *         to server side.
16  *      2. server side parses test mode, and sends back confirmation
17  *         to client side. In RDMA WRITE/READ test, this confirmation
18  *         includes memory information, such as rkey, address.
19  *      3. client side initiates test loop.
20  *      4. In RDMA WRITE/READ test, client side sends a completion
21  *         notification to server side. Server side updates its
22  *         td->done as true.
23  *
24  */
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
32 #include <netdb.h>
33 #include <sys/poll.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <sys/time.h>
37 #include <sys/resource.h>
38
39 #include <pthread.h>
40 #include <inttypes.h>
41
42 #include "../fio.h"
43 #include "../hash.h"
44
45 #include <rdma/rdma_cma.h>
46 #include <infiniband/arch.h>
47
48 #define FIO_RDMA_MAX_IO_DEPTH    512
49
50 enum rdma_io_mode {
51         FIO_RDMA_UNKNOWN = 0,
52         FIO_RDMA_MEM_WRITE,
53         FIO_RDMA_MEM_READ,
54         FIO_RDMA_CHA_SEND,
55         FIO_RDMA_CHA_RECV
56 };
57
58 struct rdmaio_options {
59         struct thread_data *td;
60         unsigned int port;
61         enum rdma_io_mode verb;
62 };
63
64 static int str_hostname_cb(void *data, const char *input)
65 {
66         struct rdmaio_options *o = data;
67
68         if (o->td->o.filename)
69                 free(o->td->o.filename);
70         o->td->o.filename = strdup(input);
71         return 0;
72 }
73
74 static struct fio_option options[] = {
75         {
76                 .name   = "hostname",
77                 .lname  = "rdma engine hostname",
78                 .type   = FIO_OPT_STR_STORE,
79                 .cb     = str_hostname_cb,
80                 .help   = "Hostname for RDMA IO engine",
81                 .category = FIO_OPT_C_ENGINE,
82                 .group  = FIO_OPT_G_RDMA,
83         },
84         {
85                 .name   = "port",
86                 .lname  = "rdma engine port",
87                 .type   = FIO_OPT_INT,
88                 .off1   = offsetof(struct rdmaio_options, port),
89                 .minval = 1,
90                 .maxval = 65535,
91                 .help   = "Port to use for RDMA connections",
92                 .category = FIO_OPT_C_ENGINE,
93                 .group  = FIO_OPT_G_RDMA,
94         },
95         {
96                 .name   = "verb",
97                 .lname  = "RDMA engine verb",
98                 .alias  = "proto",
99                 .type   = FIO_OPT_STR,
100                 .off1   = offsetof(struct rdmaio_options, verb),
101                 .help   = "RDMA engine verb",
102                 .def    = "write",
103                 .posval = {
104                           { .ival = "write",
105                             .oval = FIO_RDMA_MEM_WRITE,
106                             .help = "Memory Write",
107                           },
108                           { .ival = "read",
109                             .oval = FIO_RDMA_MEM_READ,
110                             .help = "Memory Read",
111                           },
112                           { .ival = "send",
113                             .oval = FIO_RDMA_CHA_SEND,
114                             .help = "Posted Send",
115                           },
116                           { .ival = "recv",
117                             .oval = FIO_RDMA_CHA_RECV,
118                             .help = "Posted Recieve",
119                           },
120                 },
121                 .category = FIO_OPT_C_ENGINE,
122                 .group  = FIO_OPT_G_RDMA,
123         },
124         {
125                 .name   = NULL,
126         },
127 };
128
129 struct remote_u {
130         uint64_t buf;
131         uint32_t rkey;
132         uint32_t size;
133 };
134
135 struct rdma_info_blk {
136         uint32_t mode;          /* channel semantic or memory semantic */
137         uint32_t nr;            /* client: io depth
138                                    server: number of records for memory semantic
139                                  */
140         struct remote_u rmt_us[FIO_RDMA_MAX_IO_DEPTH];
141 };
142
143 struct rdma_io_u_data {
144         uint64_t wr_id;
145         struct ibv_send_wr sq_wr;
146         struct ibv_recv_wr rq_wr;
147         struct ibv_sge rdma_sgl;
148 };
149
150 struct rdmaio_data {
151         int is_client;
152         enum rdma_io_mode rdma_protocol;
153         char host[64];
154         struct sockaddr_in addr;
155
156         struct ibv_recv_wr rq_wr;
157         struct ibv_sge recv_sgl;
158         struct rdma_info_blk recv_buf;
159         struct ibv_mr *recv_mr;
160
161         struct ibv_send_wr sq_wr;
162         struct ibv_sge send_sgl;
163         struct rdma_info_blk send_buf;
164         struct ibv_mr *send_mr;
165
166         struct ibv_comp_channel *channel;
167         struct ibv_cq *cq;
168         struct ibv_pd *pd;
169         struct ibv_qp *qp;
170
171         pthread_t cmthread;
172         struct rdma_event_channel *cm_channel;
173         struct rdma_cm_id *cm_id;
174         struct rdma_cm_id *child_cm_id;
175
176         int cq_event_num;
177
178         struct remote_u *rmt_us;
179         int rmt_nr;
180         struct io_u **io_us_queued;
181         int io_u_queued_nr;
182         struct io_u **io_us_flight;
183         int io_u_flight_nr;
184         struct io_u **io_us_completed;
185         int io_u_completed_nr;
186
187         struct frand_state rand_state;
188 };
189
190 static int client_recv(struct thread_data *td, struct ibv_wc *wc)
191 {
192         struct rdmaio_data *rd = td->io_ops->data;
193
194         if (wc->byte_len != sizeof(rd->recv_buf)) {
195                 log_err("Received bogus data, size %d\n", wc->byte_len);
196                 return 1;
197         }
198
199         /* store mr info for MEMORY semantic */
200         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
201             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
202                 /* struct flist_head *entry; */
203                 int i = 0;
204
205                 rd->rmt_nr = ntohl(rd->recv_buf.nr);
206
207                 for (i = 0; i < rd->rmt_nr; i++) {
208                         rd->rmt_us[i].buf = ntohll(rd->recv_buf.rmt_us[i].buf);
209                         rd->rmt_us[i].rkey = ntohl(rd->recv_buf.rmt_us[i].rkey);
210                         rd->rmt_us[i].size = ntohl(rd->recv_buf.rmt_us[i].size);
211
212                         dprint(FD_IO,
213                                "fio: Received rkey %x addr %" PRIx64
214                                " len %d from peer\n", rd->rmt_us[i].rkey,
215                                rd->rmt_us[i].buf, rd->rmt_us[i].size);
216                 }
217         }
218
219         return 0;
220 }
221
222 static int server_recv(struct thread_data *td, struct ibv_wc *wc)
223 {
224         struct rdmaio_data *rd = td->io_ops->data;
225
226         if (wc->wr_id == FIO_RDMA_MAX_IO_DEPTH) {
227                 rd->rdma_protocol = ntohl(rd->recv_buf.mode);
228
229                 /* CHANNEL semantic, do nothing */
230                 if (rd->rdma_protocol == FIO_RDMA_CHA_SEND)
231                         rd->rdma_protocol = FIO_RDMA_CHA_RECV;
232         }
233
234         return 0;
235 }
236
237 static int cq_event_handler(struct thread_data *td, enum ibv_wc_opcode opcode)
238 {
239         struct rdmaio_data *rd = td->io_ops->data;
240         struct ibv_wc wc;
241         struct rdma_io_u_data *r_io_u_d;
242         int ret;
243         int compevnum = 0;
244         int i;
245
246         while ((ret = ibv_poll_cq(rd->cq, 1, &wc)) == 1) {
247                 ret = 0;
248                 compevnum++;
249
250                 if (wc.status) {
251                         log_err("fio: cq completion status %d(%s)\n",
252                                 wc.status, ibv_wc_status_str(wc.status));
253                         return -1;
254                 }
255
256                 switch (wc.opcode) {
257
258                 case IBV_WC_RECV:
259                         if (rd->is_client == 1)
260                                 client_recv(td, &wc);
261                         else
262                                 server_recv(td, &wc);
263
264                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
265                                 break;
266
267                         for (i = 0; i < rd->io_u_flight_nr; i++) {
268                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
269
270                                 if (wc.wr_id == r_io_u_d->rq_wr.wr_id) {
271                                         rd->io_us_flight[i]->resid =
272                                             rd->io_us_flight[i]->buflen
273                                             - wc.byte_len;
274
275                                         rd->io_us_flight[i]->error = 0;
276
277                                         rd->io_us_completed[rd->
278                                                             io_u_completed_nr]
279                                             = rd->io_us_flight[i];
280                                         rd->io_u_completed_nr++;
281                                         break;
282                                 }
283                         }
284                         if (i == rd->io_u_flight_nr)
285                                 log_err("fio: recv wr %" PRId64 " not found\n",
286                                         wc.wr_id);
287                         else {
288                                 /* put the last one into middle of the list */
289                                 rd->io_us_flight[i] =
290                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
291                                 rd->io_u_flight_nr--;
292                         }
293
294                         break;
295
296                 case IBV_WC_SEND:
297                 case IBV_WC_RDMA_WRITE:
298                 case IBV_WC_RDMA_READ:
299                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
300                                 break;
301
302                         for (i = 0; i < rd->io_u_flight_nr; i++) {
303                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
304
305                                 if (wc.wr_id == r_io_u_d->sq_wr.wr_id) {
306                                         rd->io_us_completed[rd->
307                                                             io_u_completed_nr]
308                                             = rd->io_us_flight[i];
309                                         rd->io_u_completed_nr++;
310                                         break;
311                                 }
312                         }
313                         if (i == rd->io_u_flight_nr)
314                                 log_err("fio: send wr %" PRId64 " not found\n",
315                                         wc.wr_id);
316                         else {
317                                 /* put the last one into middle of the list */
318                                 rd->io_us_flight[i] =
319                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
320                                 rd->io_u_flight_nr--;
321                         }
322
323                         break;
324
325                 default:
326                         log_info("fio: unknown completion event %d\n",
327                                  wc.opcode);
328                         return -1;
329                 }
330                 rd->cq_event_num++;
331         }
332         if (ret) {
333                 log_err("fio: poll error %d\n", ret);
334                 return 1;
335         }
336
337         return compevnum;
338 }
339
340 /*
341  * Return -1 for error and 'nr events' for a positive number
342  * of events
343  */
344 static int rdma_poll_wait(struct thread_data *td, enum ibv_wc_opcode opcode)
345 {
346         struct rdmaio_data *rd = td->io_ops->data;
347         struct ibv_cq *ev_cq;
348         void *ev_ctx;
349         int ret;
350
351         if (rd->cq_event_num > 0) {     /* previous left */
352                 rd->cq_event_num--;
353                 return 0;
354         }
355
356 again:
357         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
358                 log_err("fio: Failed to get cq event!\n");
359                 return -1;
360         }
361         if (ev_cq != rd->cq) {
362                 log_err("fio: Unknown CQ!\n");
363                 return -1;
364         }
365         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
366                 log_err("fio: Failed to set notify!\n");
367                 return -1;
368         }
369
370         ret = cq_event_handler(td, opcode);
371         if (ret < 1)
372                 goto again;
373
374         ibv_ack_cq_events(rd->cq, ret);
375
376         rd->cq_event_num--;
377
378         return ret;
379 }
380
381 static int fio_rdmaio_setup_qp(struct thread_data *td)
382 {
383         struct rdmaio_data *rd = td->io_ops->data;
384         struct ibv_qp_init_attr init_attr;
385         int qp_depth = td->o.iodepth * 2;       /* 2 times of io depth */
386
387         if (rd->is_client == 0)
388                 rd->pd = ibv_alloc_pd(rd->child_cm_id->verbs);
389         else
390                 rd->pd = ibv_alloc_pd(rd->cm_id->verbs);
391
392         if (rd->pd == NULL) {
393                 log_err("fio: ibv_alloc_pd fail\n");
394                 return 1;
395         }
396
397         if (rd->is_client == 0)
398                 rd->channel = ibv_create_comp_channel(rd->child_cm_id->verbs);
399         else
400                 rd->channel = ibv_create_comp_channel(rd->cm_id->verbs);
401         if (rd->channel == NULL) {
402                 log_err("fio: ibv_create_comp_channel fail\n");
403                 goto err1;
404         }
405
406         if (qp_depth < 16)
407                 qp_depth = 16;
408
409         if (rd->is_client == 0)
410                 rd->cq = ibv_create_cq(rd->child_cm_id->verbs,
411                                        qp_depth, rd, rd->channel, 0);
412         else
413                 rd->cq = ibv_create_cq(rd->cm_id->verbs,
414                                        qp_depth, rd, rd->channel, 0);
415         if (rd->cq == NULL) {
416                 log_err("fio: ibv_create_cq failed\n");
417                 goto err2;
418         }
419
420         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
421                 log_err("fio: ibv_create_cq failed\n");
422                 goto err3;
423         }
424
425         /* create queue pair */
426         memset(&init_attr, 0, sizeof(init_attr));
427         init_attr.cap.max_send_wr = qp_depth;
428         init_attr.cap.max_recv_wr = qp_depth;
429         init_attr.cap.max_recv_sge = 1;
430         init_attr.cap.max_send_sge = 1;
431         init_attr.qp_type = IBV_QPT_RC;
432         init_attr.send_cq = rd->cq;
433         init_attr.recv_cq = rd->cq;
434
435         if (rd->is_client == 0) {
436                 if (rdma_create_qp(rd->child_cm_id, rd->pd, &init_attr) != 0) {
437                         log_err("fio: rdma_create_qp failed\n");
438                         goto err3;
439                 }
440                 rd->qp = rd->child_cm_id->qp;
441         } else {
442                 if (rdma_create_qp(rd->cm_id, rd->pd, &init_attr) != 0) {
443                         log_err("fio: rdma_create_qp failed\n");
444                         goto err3;
445                 }
446                 rd->qp = rd->cm_id->qp;
447         }
448
449         return 0;
450
451 err3:
452         ibv_destroy_cq(rd->cq);
453 err2:
454         ibv_destroy_comp_channel(rd->channel);
455 err1:
456         ibv_dealloc_pd(rd->pd);
457
458         return 1;
459 }
460
461 static int fio_rdmaio_setup_control_msg_buffers(struct thread_data *td)
462 {
463         struct rdmaio_data *rd = td->io_ops->data;
464
465         rd->recv_mr = ibv_reg_mr(rd->pd, &rd->recv_buf, sizeof(rd->recv_buf),
466                                  IBV_ACCESS_LOCAL_WRITE);
467         if (rd->recv_mr == NULL) {
468                 log_err("fio: recv_buf reg_mr failed\n");
469                 return 1;
470         }
471
472         rd->send_mr = ibv_reg_mr(rd->pd, &rd->send_buf, sizeof(rd->send_buf),
473                                  0);
474         if (rd->send_mr == NULL) {
475                 log_err("fio: send_buf reg_mr failed\n");
476                 ibv_dereg_mr(rd->recv_mr);
477                 return 1;
478         }
479
480         /* setup work request */
481         /* recv wq */
482         rd->recv_sgl.addr = (uint64_t) (unsigned long)&rd->recv_buf;
483         rd->recv_sgl.length = sizeof(rd->recv_buf);
484         rd->recv_sgl.lkey = rd->recv_mr->lkey;
485         rd->rq_wr.sg_list = &rd->recv_sgl;
486         rd->rq_wr.num_sge = 1;
487         rd->rq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
488
489         /* send wq */
490         rd->send_sgl.addr = (uint64_t) (unsigned long)&rd->send_buf;
491         rd->send_sgl.length = sizeof(rd->send_buf);
492         rd->send_sgl.lkey = rd->send_mr->lkey;
493
494         rd->sq_wr.opcode = IBV_WR_SEND;
495         rd->sq_wr.send_flags = IBV_SEND_SIGNALED;
496         rd->sq_wr.sg_list = &rd->send_sgl;
497         rd->sq_wr.num_sge = 1;
498         rd->sq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
499
500         return 0;
501 }
502
503 static int get_next_channel_event(struct thread_data *td,
504                                   struct rdma_event_channel *channel,
505                                   enum rdma_cm_event_type wait_event)
506 {
507         struct rdmaio_data *rd = td->io_ops->data;
508         struct rdma_cm_event *event;
509         int ret;
510
511         ret = rdma_get_cm_event(channel, &event);
512         if (ret) {
513                 log_err("fio: rdma_get_cm_event: %d\n", ret);
514                 return 1;
515         }
516
517         if (event->event != wait_event) {
518                 log_err("fio: event is %s instead of %s\n",
519                         rdma_event_str(event->event),
520                         rdma_event_str(wait_event));
521                 return 1;
522         }
523
524         switch (event->event) {
525         case RDMA_CM_EVENT_CONNECT_REQUEST:
526                 rd->child_cm_id = event->id;
527                 break;
528         default:
529                 break;
530         }
531
532         rdma_ack_cm_event(event);
533
534         return 0;
535 }
536
537 static int fio_rdmaio_prep(struct thread_data *td, struct io_u *io_u)
538 {
539         struct rdmaio_data *rd = td->io_ops->data;
540         struct rdma_io_u_data *r_io_u_d;
541
542         r_io_u_d = io_u->engine_data;
543
544         switch (rd->rdma_protocol) {
545         case FIO_RDMA_MEM_WRITE:
546         case FIO_RDMA_MEM_READ:
547                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
548                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
549                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
550                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
551                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
552                 r_io_u_d->sq_wr.num_sge = 1;
553                 break;
554         case FIO_RDMA_CHA_SEND:
555                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
556                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
557                 r_io_u_d->rdma_sgl.length = io_u->buflen;
558                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
559                 r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
560                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
561                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
562                 r_io_u_d->sq_wr.num_sge = 1;
563                 break;
564         case FIO_RDMA_CHA_RECV:
565                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
566                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
567                 r_io_u_d->rdma_sgl.length = io_u->buflen;
568                 r_io_u_d->rq_wr.wr_id = r_io_u_d->wr_id;
569                 r_io_u_d->rq_wr.sg_list = &r_io_u_d->rdma_sgl;
570                 r_io_u_d->rq_wr.num_sge = 1;
571                 break;
572         default:
573                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
574                 break;
575         }
576
577         return 0;
578 }
579
580 static struct io_u *fio_rdmaio_event(struct thread_data *td, int event)
581 {
582         struct rdmaio_data *rd = td->io_ops->data;
583         struct io_u *io_u;
584         int i;
585
586         io_u = rd->io_us_completed[0];
587         for (i = 0; i < rd->io_u_completed_nr - 1; i++)
588                 rd->io_us_completed[i] = rd->io_us_completed[i + 1];
589
590         rd->io_u_completed_nr--;
591
592         dprint_io_u(io_u, "fio_rdmaio_event");
593
594         return io_u;
595 }
596
597 static int fio_rdmaio_getevents(struct thread_data *td, unsigned int min,
598                                 unsigned int max, const struct timespec *t)
599 {
600         struct rdmaio_data *rd = td->io_ops->data;
601         enum ibv_wc_opcode comp_opcode;
602         struct ibv_cq *ev_cq;
603         void *ev_ctx;
604         int ret, r = 0;
605         comp_opcode = IBV_WC_RDMA_WRITE;
606
607         switch (rd->rdma_protocol) {
608         case FIO_RDMA_MEM_WRITE:
609                 comp_opcode = IBV_WC_RDMA_WRITE;
610                 break;
611         case FIO_RDMA_MEM_READ:
612                 comp_opcode = IBV_WC_RDMA_READ;
613                 break;
614         case FIO_RDMA_CHA_SEND:
615                 comp_opcode = IBV_WC_SEND;
616                 break;
617         case FIO_RDMA_CHA_RECV:
618                 comp_opcode = IBV_WC_RECV;
619                 break;
620         default:
621                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
622                 break;
623         }
624
625         if (rd->cq_event_num > 0) {     /* previous left */
626                 rd->cq_event_num--;
627                 return 0;
628         }
629
630 again:
631         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
632                 log_err("fio: Failed to get cq event!\n");
633                 return -1;
634         }
635         if (ev_cq != rd->cq) {
636                 log_err("fio: Unknown CQ!\n");
637                 return -1;
638         }
639         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
640                 log_err("fio: Failed to set notify!\n");
641                 return -1;
642         }
643
644         ret = cq_event_handler(td, comp_opcode);
645         if (ret < 1)
646                 goto again;
647
648         ibv_ack_cq_events(rd->cq, ret);
649
650         r += ret;
651         if (r < min)
652                 goto again;
653
654         rd->cq_event_num -= r;
655
656         return r;
657 }
658
659 static int fio_rdmaio_send(struct thread_data *td, struct io_u **io_us,
660                            unsigned int nr)
661 {
662         struct rdmaio_data *rd = td->io_ops->data;
663         struct ibv_send_wr *bad_wr;
664 #if 0
665         enum ibv_wc_opcode comp_opcode;
666         comp_opcode = IBV_WC_RDMA_WRITE;
667 #endif
668         int i;
669         long index;
670         struct rdma_io_u_data *r_io_u_d;
671
672         r_io_u_d = NULL;
673
674         for (i = 0; i < nr; i++) {
675                 /* RDMA_WRITE or RDMA_READ */
676                 switch (rd->rdma_protocol) {
677                 case FIO_RDMA_MEM_WRITE:
678                         /* compose work request */
679                         r_io_u_d = io_us[i]->engine_data;
680                         index = __rand(&rd->rand_state) % rd->rmt_nr;
681                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_WRITE;
682                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
683                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
684                                 rd->rmt_us[index].buf;
685                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
686                         break;
687                 case FIO_RDMA_MEM_READ:
688                         /* compose work request */
689                         r_io_u_d = io_us[i]->engine_data;
690                         index = __rand(&rd->rand_state) % rd->rmt_nr;
691                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_READ;
692                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
693                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
694                                 rd->rmt_us[index].buf;
695                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
696                         break;
697                 case FIO_RDMA_CHA_SEND:
698                         r_io_u_d = io_us[i]->engine_data;
699                         r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
700                         r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
701                         break;
702                 default:
703                         log_err("fio: unknown rdma protocol - %d\n",
704                                 rd->rdma_protocol);
705                         break;
706                 }
707
708                 if (ibv_post_send(rd->qp, &r_io_u_d->sq_wr, &bad_wr) != 0) {
709                         log_err("fio: ibv_post_send fail\n");
710                         return -1;
711                 }
712
713                 dprint_io_u(io_us[i], "fio_rdmaio_send");
714         }
715
716         /* wait for completion
717            rdma_poll_wait(td, comp_opcode); */
718
719         return i;
720 }
721
722 static int fio_rdmaio_recv(struct thread_data *td, struct io_u **io_us,
723                            unsigned int nr)
724 {
725         struct rdmaio_data *rd = td->io_ops->data;
726         struct ibv_recv_wr *bad_wr;
727         struct rdma_io_u_data *r_io_u_d;
728         int i;
729
730         i = 0;
731         if (rd->rdma_protocol == FIO_RDMA_CHA_RECV) {
732                 /* post io_u into recv queue */
733                 for (i = 0; i < nr; i++) {
734                         r_io_u_d = io_us[i]->engine_data;
735                         if (ibv_post_recv(rd->qp, &r_io_u_d->rq_wr, &bad_wr) !=
736                             0) {
737                                 log_err("fio: ibv_post_recv fail\n");
738                                 return 1;
739                         }
740                 }
741         } else if ((rd->rdma_protocol == FIO_RDMA_MEM_READ)
742                    || (rd->rdma_protocol == FIO_RDMA_MEM_WRITE)) {
743                 /* re-post the rq_wr */
744                 if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
745                         log_err("fio: ibv_post_recv fail\n");
746                         return 1;
747                 }
748
749                 rdma_poll_wait(td, IBV_WC_RECV);
750
751                 dprint(FD_IO, "fio: recv FINISH message\n");
752                 td->done = 1;
753                 return 0;
754         }
755
756         return i;
757 }
758
759 static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u)
760 {
761         struct rdmaio_data *rd = td->io_ops->data;
762
763         fio_ro_check(td, io_u);
764
765         if (rd->io_u_queued_nr == (int)td->o.iodepth)
766                 return FIO_Q_BUSY;
767
768         rd->io_us_queued[rd->io_u_queued_nr] = io_u;
769         rd->io_u_queued_nr++;
770
771         dprint_io_u(io_u, "fio_rdmaio_queue");
772
773         return FIO_Q_QUEUED;
774 }
775
776 static void fio_rdmaio_queued(struct thread_data *td, struct io_u **io_us,
777                               unsigned int nr)
778 {
779         struct rdmaio_data *rd = td->io_ops->data;
780         struct timeval now;
781         unsigned int i;
782
783         if (!fio_fill_issue_time(td))
784                 return;
785
786         fio_gettime(&now, NULL);
787
788         for (i = 0; i < nr; i++) {
789                 struct io_u *io_u = io_us[i];
790
791                 /* queued -> flight */
792                 rd->io_us_flight[rd->io_u_flight_nr] = io_u;
793                 rd->io_u_flight_nr++;
794
795                 memcpy(&io_u->issue_time, &now, sizeof(now));
796                 io_u_queued(td, io_u);
797         }
798 }
799
800 static int fio_rdmaio_commit(struct thread_data *td)
801 {
802         struct rdmaio_data *rd = td->io_ops->data;
803         struct io_u **io_us;
804         int ret;
805
806         if (!rd->io_us_queued)
807                 return 0;
808
809         io_us = rd->io_us_queued;
810         do {
811                 /* RDMA_WRITE or RDMA_READ */
812                 if (rd->is_client)
813                         ret = fio_rdmaio_send(td, io_us, rd->io_u_queued_nr);
814                 else if (!rd->is_client)
815                         ret = fio_rdmaio_recv(td, io_us, rd->io_u_queued_nr);
816                 else
817                         ret = 0;        /* must be a SYNC */
818
819                 if (ret > 0) {
820                         fio_rdmaio_queued(td, io_us, ret);
821                         io_u_mark_submit(td, ret);
822                         rd->io_u_queued_nr -= ret;
823                         io_us += ret;
824                         ret = 0;
825                 } else
826                         break;
827         } while (rd->io_u_queued_nr);
828
829         return ret;
830 }
831
832 static int fio_rdmaio_connect(struct thread_data *td, struct fio_file *f)
833 {
834         struct rdmaio_data *rd = td->io_ops->data;
835         struct rdma_conn_param conn_param;
836         struct ibv_send_wr *bad_wr;
837
838         memset(&conn_param, 0, sizeof(conn_param));
839         conn_param.responder_resources = 1;
840         conn_param.initiator_depth = 1;
841         conn_param.retry_count = 10;
842
843         if (rdma_connect(rd->cm_id, &conn_param) != 0) {
844                 log_err("fio: rdma_connect fail\n");
845                 return 1;
846         }
847
848         if (get_next_channel_event
849             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
850                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
851                 return 1;
852         }
853
854         /* send task request */
855         rd->send_buf.mode = htonl(rd->rdma_protocol);
856         rd->send_buf.nr = htonl(td->o.iodepth);
857
858         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
859                 log_err("fio: ibv_post_send fail");
860                 return 1;
861         }
862
863         rdma_poll_wait(td, IBV_WC_SEND);
864
865         /* wait for remote MR info from server side */
866         rdma_poll_wait(td, IBV_WC_RECV);
867
868         /* In SEND/RECV test, it's a good practice to setup the iodepth of
869          * of the RECV side deeper than that of the SEND side to
870          * avoid RNR (receiver not ready) error. The
871          * SEND side may send so many unsolicited message before
872          * RECV side commits sufficient recv buffers into recv queue.
873          * This may lead to RNR error. Here, SEND side pauses for a while
874          * during which RECV side commits sufficient recv buffers.
875          */
876         usleep(500000);
877
878         return 0;
879 }
880
881 static int fio_rdmaio_accept(struct thread_data *td, struct fio_file *f)
882 {
883         struct rdmaio_data *rd = td->io_ops->data;
884         struct rdma_conn_param conn_param;
885         struct ibv_send_wr *bad_wr;
886
887         /* rdma_accept() - then wait for accept success */
888         memset(&conn_param, 0, sizeof(conn_param));
889         conn_param.responder_resources = 1;
890         conn_param.initiator_depth = 1;
891
892         if (rdma_accept(rd->child_cm_id, &conn_param) != 0) {
893                 log_err("fio: rdma_accept\n");
894                 return 1;
895         }
896
897         if (get_next_channel_event
898             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
899                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
900                 return 1;
901         }
902
903         /* wait for request */
904         rdma_poll_wait(td, IBV_WC_RECV);
905
906         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
907                 log_err("fio: ibv_post_send fail");
908                 return 1;
909         }
910
911         rdma_poll_wait(td, IBV_WC_SEND);
912
913         return 0;
914 }
915
916 static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f)
917 {
918         if (td_read(td))
919                 return fio_rdmaio_accept(td, f);
920         else
921                 return fio_rdmaio_connect(td, f);
922 }
923
924 static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f)
925 {
926         struct rdmaio_data *rd = td->io_ops->data;
927         struct ibv_send_wr *bad_wr;
928
929         /* unregister rdma buffer */
930
931         /*
932          * Client sends notification to the server side
933          */
934         /* refer to: http://linux.die.net/man/7/rdma_cm */
935         if ((rd->is_client == 1) && ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE)
936                                      || (rd->rdma_protocol ==
937                                          FIO_RDMA_MEM_READ))) {
938                 if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
939                         log_err("fio: ibv_post_send fail");
940                         return 1;
941                 }
942
943                 dprint(FD_IO, "fio: close information sent success\n");
944                 rdma_poll_wait(td, IBV_WC_SEND);
945         }
946
947         if (rd->is_client == 1)
948                 rdma_disconnect(rd->cm_id);
949         else {
950                 rdma_disconnect(rd->child_cm_id);
951 #if 0
952                 rdma_disconnect(rd->cm_id);
953 #endif
954         }
955
956 #if 0
957         if (get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_DISCONNECTED) != 0) {
958                 log_err("fio: wait for RDMA_CM_EVENT_DISCONNECTED\n");
959                 return 1;
960         }
961 #endif
962
963         ibv_destroy_cq(rd->cq);
964         ibv_destroy_qp(rd->qp);
965
966         if (rd->is_client == 1)
967                 rdma_destroy_id(rd->cm_id);
968         else {
969                 rdma_destroy_id(rd->child_cm_id);
970                 rdma_destroy_id(rd->cm_id);
971         }
972
973         ibv_destroy_comp_channel(rd->channel);
974         ibv_dealloc_pd(rd->pd);
975
976         return 0;
977 }
978
979 static int fio_rdmaio_setup_connect(struct thread_data *td, const char *host,
980                                     unsigned short port)
981 {
982         struct rdmaio_data *rd = td->io_ops->data;
983         struct ibv_recv_wr *bad_wr;
984         int err;
985
986         rd->addr.sin_family = AF_INET;
987         rd->addr.sin_port = htons(port);
988
989         if (inet_aton(host, &rd->addr.sin_addr) != 1) {
990                 struct hostent *hent;
991
992                 hent = gethostbyname(host);
993                 if (!hent) {
994                         td_verror(td, errno, "gethostbyname");
995                         return 1;
996                 }
997
998                 memcpy(&rd->addr.sin_addr, hent->h_addr, 4);
999         }
1000
1001         /* resolve route */
1002         err = rdma_resolve_addr(rd->cm_id, NULL, (struct sockaddr *)&rd->addr, 2000);
1003         if (err != 0) {
1004                 log_err("fio: rdma_resolve_addr: %d\n", err);
1005                 return 1;
1006         }
1007
1008         err = get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED);
1009         if (err != 0) {
1010                 log_err("fio: get_next_channel_event: %d\n", err);
1011                 return 1;
1012         }
1013
1014         /* resolve route */
1015         err = rdma_resolve_route(rd->cm_id, 2000);
1016         if (err != 0) {
1017                 log_err("fio: rdma_resolve_route: %d\n", err);
1018                 return 1;
1019         }
1020
1021         err = get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED);
1022         if (err != 0) {
1023                 log_err("fio: get_next_channel_event: %d\n", err);
1024                 return 1;
1025         }
1026
1027         /* create qp and buffer */
1028         if (fio_rdmaio_setup_qp(td) != 0)
1029                 return 1;
1030
1031         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
1032                 return 1;
1033
1034         /* post recv buf */
1035         err = ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr);
1036         if (err != 0) {
1037                 log_err("fio: ibv_post_recv fail: %d\n", err);
1038                 return 1;
1039         }
1040
1041         return 0;
1042 }
1043
1044 static int fio_rdmaio_setup_listen(struct thread_data *td, short port)
1045 {
1046         struct rdmaio_data *rd = td->io_ops->data;
1047         struct ibv_recv_wr *bad_wr;
1048
1049         rd->addr.sin_family = AF_INET;
1050         rd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
1051         rd->addr.sin_port = htons(port);
1052
1053         /* rdma_listen */
1054         if (rdma_bind_addr(rd->cm_id, (struct sockaddr *)&rd->addr) != 0) {
1055                 log_err("fio: rdma_bind_addr fail\n");
1056                 return 1;
1057         }
1058
1059         if (rdma_listen(rd->cm_id, 3) != 0) {
1060                 log_err("fio: rdma_listen fail\n");
1061                 return 1;
1062         }
1063
1064         /* wait for CONNECT_REQUEST */
1065         if (get_next_channel_event
1066             (td, rd->cm_channel, RDMA_CM_EVENT_CONNECT_REQUEST) != 0) {
1067                 log_err("fio: wait for RDMA_CM_EVENT_CONNECT_REQUEST\n");
1068                 return 1;
1069         }
1070
1071         if (fio_rdmaio_setup_qp(td) != 0)
1072                 return 1;
1073
1074         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
1075                 return 1;
1076
1077         /* post recv buf */
1078         if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
1079                 log_err("fio: ibv_post_recv fail\n");
1080                 return 1;
1081         }
1082
1083         return 0;
1084 }
1085
1086 static int check_set_rlimits(struct thread_data *td)
1087 {
1088 #ifdef CONFIG_RLIMIT_MEMLOCK
1089         struct rlimit rl;
1090
1091         /* check RLIMIT_MEMLOCK */
1092         if (getrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1093                 log_err("fio: getrlimit fail: %d(%s)\n",
1094                         errno, strerror(errno));
1095                 return 1;
1096         }
1097
1098         /* soft limit */
1099         if ((rl.rlim_cur != RLIM_INFINITY)
1100             && (rl.rlim_cur < td->orig_buffer_size)) {
1101                 log_err("fio: soft RLIMIT_MEMLOCK is: %" PRId64 "\n",
1102                         rl.rlim_cur);
1103                 log_err("fio: total block size is:    %zd\n",
1104                         td->orig_buffer_size);
1105                 /* try to set larger RLIMIT_MEMLOCK */
1106                 rl.rlim_cur = rl.rlim_max;
1107                 if (setrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1108                         log_err("fio: setrlimit fail: %d(%s)\n",
1109                                 errno, strerror(errno));
1110                         log_err("fio: you may try enlarge MEMLOCK by root\n");
1111                         log_err("# ulimit -l unlimited\n");
1112                         return 1;
1113                 }
1114         }
1115 #endif
1116
1117         return 0;
1118 }
1119
1120 static int compat_options(struct thread_data *td)
1121 {
1122         // The original RDMA engine had an ugly / seperator
1123         // on the filename for it's options. This function
1124         // retains backwards compatibility with it.100
1125
1126         struct rdmaio_options *o = td->eo;
1127         char *modep, *portp;
1128         char *filename = td->o.filename;
1129
1130         if (!filename)
1131                 return 0;
1132
1133         portp = strchr(filename, '/');
1134         if (portp == NULL)
1135                 return 0;
1136
1137         *portp = '\0';
1138         portp++;
1139
1140         o->port = strtol(portp, NULL, 10);
1141         if (!o->port || o->port > 65535)
1142                 goto bad_host;
1143
1144         modep = strchr(portp, '/');
1145         if (modep != NULL) {
1146                 *modep = '\0';
1147                 modep++;
1148         }
1149
1150         if (modep) {
1151                 if (!strncmp("rdma_write", modep, strlen(modep)) ||
1152                     !strncmp("RDMA_WRITE", modep, strlen(modep)))
1153                         o->verb = FIO_RDMA_MEM_WRITE;
1154                 else if (!strncmp("rdma_read", modep, strlen(modep)) ||
1155                          !strncmp("RDMA_READ", modep, strlen(modep)))
1156                         o->verb = FIO_RDMA_MEM_READ;
1157                 else if (!strncmp("send", modep, strlen(modep)) ||
1158                          !strncmp("SEND", modep, strlen(modep)))
1159                         o->verb = FIO_RDMA_CHA_SEND;
1160                 else
1161                         goto bad_host;
1162         } else
1163                 o->verb = FIO_RDMA_MEM_WRITE;
1164
1165
1166         return 0;
1167
1168 bad_host:
1169         log_err("fio: bad rdma host/port/protocol: %s\n", td->o.filename);
1170         return 1;
1171 }
1172
1173 static int fio_rdmaio_init(struct thread_data *td)
1174 {
1175         struct rdmaio_data *rd = td->io_ops->data;
1176         struct rdmaio_options *o = td->eo;
1177         unsigned int max_bs;
1178         int ret, i;
1179
1180         if (td_rw(td)) {
1181                 log_err("fio: rdma connections must be read OR write\n");
1182                 return 1;
1183         }
1184         if (td_random(td)) {
1185                 log_err("fio: RDMA network IO can't be random\n");
1186                 return 1;
1187         }
1188
1189         if (compat_options(td))
1190                 return 1;
1191
1192         if (!o->port) {
1193                 log_err("fio: no port has been specified which is required "
1194                         "for the rdma engine\n");
1195                 return 1;
1196         }
1197
1198         if (check_set_rlimits(td))
1199                 return 1;
1200
1201         rd->rdma_protocol = o->verb;
1202         rd->cq_event_num = 0;
1203
1204         rd->cm_channel = rdma_create_event_channel();
1205         if (!rd->cm_channel) {
1206                 log_err("fio: rdma_create_event_channel fail\n");
1207                 return 1;
1208         }
1209
1210         ret = rdma_create_id(rd->cm_channel, &rd->cm_id, rd, RDMA_PS_TCP);
1211         if (ret) {
1212                 log_err("fio: rdma_create_id fail\n");
1213                 return 1;
1214         }
1215
1216         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
1217             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
1218                 rd->rmt_us =
1219                         malloc(FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1220                 memset(rd->rmt_us, 0,
1221                         FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1222                 rd->rmt_nr = 0;
1223         }
1224
1225         rd->io_us_queued = malloc(td->o.iodepth * sizeof(struct io_u *));
1226         memset(rd->io_us_queued, 0, td->o.iodepth * sizeof(struct io_u *));
1227         rd->io_u_queued_nr = 0;
1228
1229         rd->io_us_flight = malloc(td->o.iodepth * sizeof(struct io_u *));
1230         memset(rd->io_us_flight, 0, td->o.iodepth * sizeof(struct io_u *));
1231         rd->io_u_flight_nr = 0;
1232
1233         rd->io_us_completed = malloc(td->o.iodepth * sizeof(struct io_u *));
1234         memset(rd->io_us_completed, 0, td->o.iodepth * sizeof(struct io_u *));
1235         rd->io_u_completed_nr = 0;
1236
1237         if (td_read(td)) {      /* READ as the server */
1238                 rd->is_client = 0;
1239                 /* server rd->rdma_buf_len will be setup after got request */
1240                 ret = fio_rdmaio_setup_listen(td, o->port);
1241         } else {                /* WRITE as the client */
1242                 rd->is_client = 1;
1243                 ret = fio_rdmaio_setup_connect(td, td->o.filename, o->port);
1244         }
1245
1246         max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
1247         /* register each io_u in the free list */
1248         for (i = 0; i < td->io_u_freelist.nr; i++) {
1249                 struct io_u *io_u = td->io_u_freelist.io_us[i];
1250
1251                 io_u->engine_data = malloc(sizeof(struct rdma_io_u_data));
1252                 memset(io_u->engine_data, 0, sizeof(struct rdma_io_u_data));
1253                 ((struct rdma_io_u_data *)io_u->engine_data)->wr_id = i;
1254
1255                 io_u->mr = ibv_reg_mr(rd->pd, io_u->buf, max_bs,
1256                                       IBV_ACCESS_LOCAL_WRITE |
1257                                       IBV_ACCESS_REMOTE_READ |
1258                                       IBV_ACCESS_REMOTE_WRITE);
1259                 if (io_u->mr == NULL) {
1260                         log_err("fio: ibv_reg_mr io_u failed\n");
1261                         return 1;
1262                 }
1263
1264                 rd->send_buf.rmt_us[i].buf =
1265                     htonll((uint64_t) (unsigned long)io_u->buf);
1266                 rd->send_buf.rmt_us[i].rkey = htonl(io_u->mr->rkey);
1267                 rd->send_buf.rmt_us[i].size = htonl(max_bs);
1268
1269 #if 0
1270                 log_info("fio: Send rkey %x addr %" PRIx64 " len %d to client\n", io_u->mr->rkey, io_u->buf, max_bs); */
1271 #endif
1272         }
1273
1274         rd->send_buf.nr = htonl(i);
1275
1276         return ret;
1277 }
1278
1279 static void fio_rdmaio_cleanup(struct thread_data *td)
1280 {
1281         struct rdmaio_data *rd = td->io_ops->data;
1282
1283         if (rd)
1284                 free(rd);
1285 }
1286
1287 static int fio_rdmaio_setup(struct thread_data *td)
1288 {
1289         struct rdmaio_data *rd;
1290
1291         if (!td->files_index) {
1292                 add_file(td, td->o.filename ?: "rdma", 0, 0);
1293                 td->o.nr_files = td->o.nr_files ?: 1;
1294                 td->o.open_files++;
1295         }
1296
1297         if (!td->io_ops->data) {
1298                 rd = malloc(sizeof(*rd));
1299
1300                 memset(rd, 0, sizeof(*rd));
1301                 init_rand_seed(&rd->rand_state, (unsigned int) GOLDEN_RATIO_PRIME, 0);
1302                 td->io_ops->data = rd;
1303         }
1304
1305         return 0;
1306 }
1307
1308 static struct ioengine_ops ioengine_rw = {
1309         .name                   = "rdma",
1310         .version                = FIO_IOOPS_VERSION,
1311         .setup                  = fio_rdmaio_setup,
1312         .init                   = fio_rdmaio_init,
1313         .prep                   = fio_rdmaio_prep,
1314         .queue                  = fio_rdmaio_queue,
1315         .commit                 = fio_rdmaio_commit,
1316         .getevents              = fio_rdmaio_getevents,
1317         .event                  = fio_rdmaio_event,
1318         .cleanup                = fio_rdmaio_cleanup,
1319         .open_file              = fio_rdmaio_open_file,
1320         .close_file             = fio_rdmaio_close_file,
1321         .flags                  = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO,
1322         .options                = options,
1323         .option_struct_size     = sizeof(struct rdmaio_options),
1324 };
1325
1326 static void fio_init fio_rdmaio_register(void)
1327 {
1328         register_ioengine(&ioengine_rw);
1329 }
1330
1331 static void fio_exit fio_rdmaio_unregister(void)
1332 {
1333         unregister_ioengine(&ioengine_rw);
1334 }