26334328cddf3fed500cc155741f7ffaba006f5b
[fio.git] / engines / rdma.c
1 /*
2  * RDMA I/O engine
3  *
4  * RDMA I/O engine based on the IB verbs and RDMA/CM user space libraries.
5  * Supports both RDMA memory semantics and channel semantics
6  *   for the InfiniBand, RoCE and iWARP protocols.
7  *
8  * This I/O engine is disabled by default. To enable it, execute:
9  *
10  * $ export EXTFLAGS+=" -DFIO_HAVE_RDMA "
11  * $ export EXTLIBS+=" -libverbs -lrdmacm "
12  *
13  * before running make. You will need the Linux RDMA software as well, either
14  * from your Linux distributor or directly from openfabrics.org:
15  *
16  * http://www.openfabrics.org/downloads/OFED/
17  *
18  * Exchanging steps of RDMA ioengine control messages:
19  *      1. client side sends test mode (RDMA_WRITE/RDMA_READ/SEND)
20  *         to server side.
21  *      2. server side parses test mode, and sends back confirmation
22  *         to client side. In RDMA WRITE/READ test, this confirmation
23  *         includes memory information, such as rkey, address.
24  *      3. client side initiates test loop.
25  *      4. In RDMA WRITE/READ test, client side sends a completion
26  *         notification to server side. Server side updates its
27  *         td->done as true.
28  *
29  */
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <errno.h>
34 #include <assert.h>
35 #include <netinet/in.h>
36 #include <arpa/inet.h>
37 #include <netdb.h>
38 #include <sys/poll.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <sys/time.h>
42 #include <sys/resource.h>
43
44 #include <byteswap.h>
45 #include <pthread.h>
46 #include <inttypes.h>
47
48 #include "../fio.h"
49
50 #ifdef FIO_HAVE_RDMA
51
52 #include <rdma/rdma_cma.h>
53 #include <infiniband/arch.h>
54
55 #define FIO_RDMA_MAX_IO_DEPTH    512
56
57 enum rdma_io_mode {
58         FIO_RDMA_UNKNOWN = 0,
59         FIO_RDMA_MEM_WRITE,
60         FIO_RDMA_MEM_READ,
61         FIO_RDMA_CHA_SEND,
62         FIO_RDMA_CHA_RECV
63 };
64
65 struct remote_u {
66         uint64_t buf;
67         uint32_t rkey;
68         uint32_t size;
69 };
70
71 struct rdma_info_blk {
72         uint32_t mode;          /* channel semantic or memory semantic */
73         uint32_t nr;            /* client: io depth
74                                    server: number of records for memory semantic
75                                  */
76         struct remote_u rmt_us[FIO_RDMA_MAX_IO_DEPTH];
77 };
78
79 struct rdma_io_u_data {
80         uint64_t wr_id;
81         struct ibv_send_wr sq_wr;
82         struct ibv_recv_wr rq_wr;
83         struct ibv_sge rdma_sgl;
84 };
85
86 struct rdmaio_data {
87         int is_client;
88         enum rdma_io_mode rdma_protocol;
89         char host[64];
90         struct sockaddr_in addr;
91
92         struct ibv_recv_wr rq_wr;
93         struct ibv_sge recv_sgl;
94         struct rdma_info_blk recv_buf;
95         struct ibv_mr *recv_mr;
96
97         struct ibv_send_wr sq_wr;
98         struct ibv_sge send_sgl;
99         struct rdma_info_blk send_buf;
100         struct ibv_mr *send_mr;
101
102         struct ibv_comp_channel *channel;
103         struct ibv_cq *cq;
104         struct ibv_pd *pd;
105         struct ibv_qp *qp;
106
107         pthread_t cmthread;
108         struct rdma_event_channel *cm_channel;
109         struct rdma_cm_id *cm_id;
110         struct rdma_cm_id *child_cm_id;
111
112         int cq_event_num;
113
114         struct remote_u *rmt_us;
115         int rmt_nr;
116         struct io_u **io_us_queued;
117         int io_u_queued_nr;
118         struct io_u **io_us_flight;
119         int io_u_flight_nr;
120         struct io_u **io_us_completed;
121         int io_u_completed_nr;
122 };
123
124 static int client_recv(struct thread_data *td, struct ibv_wc *wc)
125 {
126         struct rdmaio_data *rd = td->io_ops->data;
127
128         if (wc->byte_len != sizeof(rd->recv_buf)) {
129                 log_err("Received bogus data, size %d\n", wc->byte_len);
130                 return 1;
131         }
132
133         /* store mr info for MEMORY semantic */
134         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
135             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
136                 /* struct flist_head *entry; */
137                 int i = 0;
138
139                 rd->rmt_nr = ntohl(rd->recv_buf.nr);
140
141                 for (i = 0; i < rd->rmt_nr; i++) {
142                         rd->rmt_us[i].buf = ntohll(rd->recv_buf.rmt_us[i].buf);
143                         rd->rmt_us[i].rkey = ntohl(rd->recv_buf.rmt_us[i].rkey);
144                         rd->rmt_us[i].size = ntohl(rd->recv_buf.rmt_us[i].size);
145
146                         dprint(FD_IO,
147                                "fio: Received rkey %x addr %" PRIx64
148                                " len %d from peer\n", rd->rmt_us[i].rkey,
149                                rd->rmt_us[i].buf, rd->rmt_us[i].size);
150                 }
151         }
152
153         return 0;
154 }
155
156 static int server_recv(struct thread_data *td, struct ibv_wc *wc)
157 {
158         struct rdmaio_data *rd = td->io_ops->data;
159
160         if (wc->wr_id == FIO_RDMA_MAX_IO_DEPTH) {
161                 rd->rdma_protocol = ntohl(rd->recv_buf.mode);
162
163                 /* CHANNEL semantic, do nothing */
164                 if (rd->rdma_protocol == FIO_RDMA_CHA_SEND)
165                         rd->rdma_protocol = FIO_RDMA_CHA_RECV;
166         }
167
168         return 0;
169 }
170
171 static int cq_event_handler(struct thread_data *td, enum ibv_wc_opcode opcode)
172 {
173         struct rdmaio_data *rd = td->io_ops->data;
174         struct ibv_wc wc;
175         struct rdma_io_u_data *r_io_u_d;
176         int ret;
177         int compevnum = 0;
178         int i;
179
180         while ((ret = ibv_poll_cq(rd->cq, 1, &wc)) == 1) {
181                 ret = 0;
182                 compevnum++;
183
184                 if (wc.status) {
185                         log_err("fio: cq completion status %d(%s)\n",
186                                 wc.status, ibv_wc_status_str(wc.status));
187                         return -1;
188                 }
189
190                 switch (wc.opcode) {
191
192                 case IBV_WC_RECV:
193                         if (rd->is_client == 1)
194                                 client_recv(td, &wc);
195                         else
196                                 server_recv(td, &wc);
197
198                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
199                                 break;
200
201                         for (i = 0; i < rd->io_u_flight_nr; i++) {
202                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
203
204                                 if (wc.wr_id == r_io_u_d->rq_wr.wr_id) {
205                                         rd->io_us_flight[i]->resid =
206                                             rd->io_us_flight[i]->buflen
207                                             - wc.byte_len;
208
209                                         rd->io_us_flight[i]->error = 0;
210
211                                         rd->io_us_completed[rd->
212                                                             io_u_completed_nr]
213                                             = rd->io_us_flight[i];
214                                         rd->io_u_completed_nr++;
215                                         break;
216                                 }
217                         }
218                         if (i == rd->io_u_flight_nr)
219                                 log_err("fio: recv wr %" PRId64 " not found\n",
220                                         wc.wr_id);
221                         else {
222                                 /* put the last one into middle of the list */
223                                 rd->io_us_flight[i] =
224                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
225                                 rd->io_u_flight_nr--;
226                         }
227
228                         break;
229
230                 case IBV_WC_SEND:
231                 case IBV_WC_RDMA_WRITE:
232                 case IBV_WC_RDMA_READ:
233                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
234                                 break;
235
236                         for (i = 0; i < rd->io_u_flight_nr; i++) {
237                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
238
239                                 if (wc.wr_id == r_io_u_d->sq_wr.wr_id) {
240                                         rd->io_us_completed[rd->
241                                                             io_u_completed_nr]
242                                             = rd->io_us_flight[i];
243                                         rd->io_u_completed_nr++;
244                                         break;
245                                 }
246                         }
247                         if (i == rd->io_u_flight_nr)
248                                 log_err("fio: send wr %" PRId64 " not found\n",
249                                         wc.wr_id);
250                         else {
251                                 /* put the last one into middle of the list */
252                                 rd->io_us_flight[i] =
253                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
254                                 rd->io_u_flight_nr--;
255                         }
256
257                         break;
258
259                 default:
260                         log_info("fio: unknown completion event %d\n",
261                                  wc.opcode);
262                         return -1;
263                 }
264                 rd->cq_event_num++;
265         }
266         if (ret) {
267                 log_err("fio: poll error %d\n", ret);
268                 return 1;
269         }
270
271         return compevnum;
272 }
273
274 /*
275  * Return -1 for error and 'nr events' for a positive number
276  * of events
277  */
278 static int rdma_poll_wait(struct thread_data *td, enum ibv_wc_opcode opcode)
279 {
280         struct rdmaio_data *rd = td->io_ops->data;
281         struct ibv_cq *ev_cq;
282         void *ev_ctx;
283         int ret;
284
285         if (rd->cq_event_num > 0) {     /* previous left */
286                 rd->cq_event_num--;
287                 return 0;
288         }
289
290 again:
291         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
292                 log_err("fio: Failed to get cq event!\n");
293                 return -1;
294         }
295         if (ev_cq != rd->cq) {
296                 log_err("fio: Unknown CQ!\n");
297                 return -1;
298         }
299         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
300                 log_err("fio: Failed to set notify!\n");
301                 return -1;
302         }
303
304         ret = cq_event_handler(td, opcode);
305         if (ret < 1)
306                 goto again;
307
308         ibv_ack_cq_events(rd->cq, ret);
309
310         rd->cq_event_num--;
311
312         return ret;
313 }
314
315 static int fio_rdmaio_setup_qp(struct thread_data *td)
316 {
317         struct rdmaio_data *rd = td->io_ops->data;
318         struct ibv_qp_init_attr init_attr;
319         int qp_depth = td->o.iodepth * 2;       /* 2 times of io depth */
320
321         if (rd->is_client == 0)
322                 rd->pd = ibv_alloc_pd(rd->child_cm_id->verbs);
323         else
324                 rd->pd = ibv_alloc_pd(rd->cm_id->verbs);
325         if (rd->pd == NULL) {
326                 log_err("fio: ibv_alloc_pd fail\n");
327                 return 1;
328         }
329
330         if (rd->is_client == 0)
331                 rd->channel = ibv_create_comp_channel(rd->child_cm_id->verbs);
332         else
333                 rd->channel = ibv_create_comp_channel(rd->cm_id->verbs);
334         if (rd->channel == NULL) {
335                 log_err("fio: ibv_create_comp_channel fail\n");
336                 goto err1;
337         }
338
339         if (qp_depth < 16)
340                 qp_depth = 16;
341
342         if (rd->is_client == 0)
343                 rd->cq = ibv_create_cq(rd->child_cm_id->verbs,
344                                        qp_depth, rd, rd->channel, 0);
345         else
346                 rd->cq = ibv_create_cq(rd->cm_id->verbs,
347                                        qp_depth, rd, rd->channel, 0);
348         if (rd->cq == NULL) {
349                 log_err("fio: ibv_create_cq failed\n");
350                 goto err2;
351         }
352
353         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
354                 log_err("fio: ibv_create_cq failed\n");
355                 goto err3;
356         }
357
358         /* create queue pair */
359         memset(&init_attr, 0, sizeof(init_attr));
360         init_attr.cap.max_send_wr = qp_depth;
361         init_attr.cap.max_recv_wr = qp_depth;
362         init_attr.cap.max_recv_sge = 1;
363         init_attr.cap.max_send_sge = 1;
364         init_attr.qp_type = IBV_QPT_RC;
365         init_attr.send_cq = rd->cq;
366         init_attr.recv_cq = rd->cq;
367
368         if (rd->is_client == 0) {
369                 if (rdma_create_qp(rd->child_cm_id, rd->pd, &init_attr) != 0) {
370                         log_err("fio: rdma_create_qp failed\n");
371                         goto err3;
372                 }
373                 rd->qp = rd->child_cm_id->qp;
374         } else {
375                 if (rdma_create_qp(rd->cm_id, rd->pd, &init_attr) != 0) {
376                         log_err("fio: rdma_create_qp failed\n");
377                         goto err3;
378                 }
379                 rd->qp = rd->cm_id->qp;
380         }
381
382         return 0;
383
384 err3:
385         ibv_destroy_cq(rd->cq);
386 err2:
387         ibv_destroy_comp_channel(rd->channel);
388 err1:
389         ibv_dealloc_pd(rd->pd);
390
391         return 1;
392 }
393
394 static int fio_rdmaio_setup_control_msg_buffers(struct thread_data *td)
395 {
396         struct rdmaio_data *rd = td->io_ops->data;
397
398         rd->recv_mr = ibv_reg_mr(rd->pd, &rd->recv_buf, sizeof(rd->recv_buf),
399                                  IBV_ACCESS_LOCAL_WRITE);
400         if (rd->recv_mr == NULL) {
401                 log_err("fio: recv_buf reg_mr failed\n");
402                 return 1;
403         }
404
405         rd->send_mr = ibv_reg_mr(rd->pd, &rd->send_buf, sizeof(rd->send_buf),
406                                  0);
407         if (rd->send_mr == NULL) {
408                 log_err("fio: send_buf reg_mr failed\n");
409                 ibv_dereg_mr(rd->recv_mr);
410                 return 1;
411         }
412
413         /* setup work request */
414         /* recv wq */
415         rd->recv_sgl.addr = (uint64_t) (unsigned long)&rd->recv_buf;
416         rd->recv_sgl.length = sizeof rd->recv_buf;
417         rd->recv_sgl.lkey = rd->recv_mr->lkey;
418         rd->rq_wr.sg_list = &rd->recv_sgl;
419         rd->rq_wr.num_sge = 1;
420         rd->rq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
421
422         /* send wq */
423         rd->send_sgl.addr = (uint64_t) (unsigned long)&rd->send_buf;
424         rd->send_sgl.length = sizeof rd->send_buf;
425         rd->send_sgl.lkey = rd->send_mr->lkey;
426
427         rd->sq_wr.opcode = IBV_WR_SEND;
428         rd->sq_wr.send_flags = IBV_SEND_SIGNALED;
429         rd->sq_wr.sg_list = &rd->send_sgl;
430         rd->sq_wr.num_sge = 1;
431         rd->sq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
432
433         return 0;
434 }
435
436 static int get_next_channel_event(struct thread_data *td,
437                                   struct rdma_event_channel *channel,
438                                   enum rdma_cm_event_type wait_event)
439 {
440         struct rdmaio_data *rd = td->io_ops->data;
441
442         int ret;
443         struct rdma_cm_event *event;
444
445         ret = rdma_get_cm_event(channel, &event);
446         if (ret) {
447                 log_err("fio: rdma_get_cm_event");
448                 return 1;
449         }
450
451         if (event->event != wait_event) {
452                 log_err("fio: event is %s instead of %s\n",
453                         rdma_event_str(event->event),
454                         rdma_event_str(wait_event));
455                 return 1;
456         }
457
458         switch (event->event) {
459         case RDMA_CM_EVENT_CONNECT_REQUEST:
460                 rd->child_cm_id = event->id;
461                 break;
462         default:
463                 break;
464         }
465
466         rdma_ack_cm_event(event);
467
468         return 0;
469 }
470
471 static int fio_rdmaio_prep(struct thread_data *td, struct io_u *io_u)
472 {
473         struct rdmaio_data *rd = td->io_ops->data;
474         struct rdma_io_u_data *r_io_u_d;
475
476         r_io_u_d = io_u->engine_data;
477
478         switch (rd->rdma_protocol) {
479         case FIO_RDMA_MEM_WRITE:
480         case FIO_RDMA_MEM_READ:
481                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
482                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
483                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
484                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
485                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
486                 r_io_u_d->sq_wr.num_sge = 1;
487                 break;
488         case FIO_RDMA_CHA_SEND:
489                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
490                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
491                 r_io_u_d->rdma_sgl.length = io_u->buflen;
492                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
493                 r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
494                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
495                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
496                 r_io_u_d->sq_wr.num_sge = 1;
497                 break;
498         case FIO_RDMA_CHA_RECV:
499                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
500                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
501                 r_io_u_d->rdma_sgl.length = io_u->buflen;
502                 r_io_u_d->rq_wr.wr_id = r_io_u_d->wr_id;
503                 r_io_u_d->rq_wr.sg_list = &r_io_u_d->rdma_sgl;
504                 r_io_u_d->rq_wr.num_sge = 1;
505                 break;
506         default:
507                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
508                 break;
509         }
510
511         return 0;
512 }
513
514 static struct io_u *fio_rdmaio_event(struct thread_data *td, int event)
515 {
516         struct rdmaio_data *rd = td->io_ops->data;
517         struct io_u *io_u;
518         int i;
519
520         io_u = rd->io_us_completed[0];
521         for (i = 0; i < rd->io_u_completed_nr - 1; i++) {
522                 rd->io_us_completed[i] = rd->io_us_completed[i + 1];
523         }
524         rd->io_u_completed_nr--;
525
526         dprint_io_u(io_u, "fio_rdmaio_event");
527
528         return io_u;
529 }
530
531 static int fio_rdmaio_getevents(struct thread_data *td, unsigned int min,
532                                 unsigned int max, struct timespec *t)
533 {
534         struct rdmaio_data *rd = td->io_ops->data;
535         int r;
536         enum ibv_wc_opcode comp_opcode;
537         comp_opcode = IBV_WC_RDMA_WRITE;
538         struct ibv_cq *ev_cq;
539         void *ev_ctx;
540         int ret;
541
542         r = 0;
543
544         switch (rd->rdma_protocol) {
545         case FIO_RDMA_MEM_WRITE:
546                 comp_opcode = IBV_WC_RDMA_WRITE;
547                 break;
548         case FIO_RDMA_MEM_READ:
549                 comp_opcode = IBV_WC_RDMA_READ;
550                 break;
551         case FIO_RDMA_CHA_SEND:
552                 comp_opcode = IBV_WC_SEND;
553                 break;
554         case FIO_RDMA_CHA_RECV:
555                 comp_opcode = IBV_WC_RECV;
556                 break;
557         default:
558                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
559                 break;
560         }
561
562         if (rd->cq_event_num > 0) {     /* previous left */
563                 rd->cq_event_num--;
564                 return 0;
565         }
566
567 again:
568         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
569                 log_err("fio: Failed to get cq event!\n");
570                 return -1;
571         }
572         if (ev_cq != rd->cq) {
573                 log_err("fio: Unknown CQ!\n");
574                 return -1;
575         }
576         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
577                 log_err("fio: Failed to set notify!\n");
578                 return -1;
579         }
580
581         ret = cq_event_handler(td, comp_opcode);
582         if (ret < 1)
583                 goto again;
584
585         ibv_ack_cq_events(rd->cq, ret);
586
587         r += ret;
588         if (r < min)
589                 goto again;
590
591         rd->cq_event_num -= r;
592
593         return r;
594 }
595
596 static int fio_rdmaio_send(struct thread_data *td, struct io_u **io_us,
597                            unsigned int nr)
598 {
599         struct rdmaio_data *rd = td->io_ops->data;
600         struct ibv_send_wr *bad_wr;
601 #if 0
602         enum ibv_wc_opcode comp_opcode;
603         comp_opcode = IBV_WC_RDMA_WRITE;
604 #endif
605         int i;
606         long index;
607         struct rdma_io_u_data *r_io_u_d;
608
609         r_io_u_d = NULL;
610
611         for (i = 0; i < nr; i++) {
612                 /* RDMA_WRITE or RDMA_READ */
613                 switch (rd->rdma_protocol) {
614                 case FIO_RDMA_MEM_WRITE:
615                         /* compose work request */
616                         r_io_u_d = io_us[i]->engine_data;
617                         if (td->o.use_os_rand)
618                                 index = os_random_long(&td->random_state) % rd->rmt_nr;
619                         else
620                                 index = __rand(&td->__random_state) % rd->rmt_nr;
621                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_WRITE;
622                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
623                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
624                                 rd->rmt_us[index].buf;
625                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
626                         break;
627                 case FIO_RDMA_MEM_READ:
628                         /* compose work request */
629                         r_io_u_d = io_us[i]->engine_data;
630                         if (td->o.use_os_rand)
631                                 index = os_random_long(&td->random_state) % rd->rmt_nr;
632                         else
633                                 index = __rand(&td->__random_state) % rd->rmt_nr;
634                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_READ;
635                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
636                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
637                                 rd->rmt_us[index].buf;
638                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
639                         break;
640                 case FIO_RDMA_CHA_SEND:
641                         r_io_u_d = io_us[i]->engine_data;
642                         r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
643                         r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
644                         break;
645                 default:
646                         log_err("fio: unknown rdma protocol - %d\n",
647                                 rd->rdma_protocol);
648                         break;
649                 }
650
651                 if (ibv_post_send(rd->qp, &r_io_u_d->sq_wr, &bad_wr) != 0) {
652                         log_err("fio: ibv_post_send fail\n");
653                         return -1;
654                 }
655
656                 dprint_io_u(io_us[i], "fio_rdmaio_send");
657         }
658
659         /* wait for completion
660            rdma_poll_wait(td, comp_opcode); */
661
662         return i;
663 }
664
665 static int fio_rdmaio_recv(struct thread_data *td, struct io_u **io_us,
666                            unsigned int nr)
667 {
668         struct rdmaio_data *rd = td->io_ops->data;
669         struct ibv_recv_wr *bad_wr;
670         struct rdma_io_u_data *r_io_u_d;
671         int i;
672
673         i = 0;
674         if (rd->rdma_protocol == FIO_RDMA_CHA_RECV) {
675                 /* post io_u into recv queue */
676                 for (i = 0; i < nr; i++) {
677                         r_io_u_d = io_us[i]->engine_data;
678                         if (ibv_post_recv(rd->qp, &r_io_u_d->rq_wr, &bad_wr) !=
679                             0) {
680                                 log_err("fio: ibv_post_recv fail\n");
681                                 return 1;
682                         }
683                 }
684         } else if ((rd->rdma_protocol == FIO_RDMA_MEM_READ)
685                    || (rd->rdma_protocol == FIO_RDMA_MEM_WRITE)) {
686                 /* re-post the rq_wr */
687                 if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
688                         log_err("fio: ibv_post_recv fail\n");
689                         return 1;
690                 }
691
692                 rdma_poll_wait(td, IBV_WC_RECV);
693
694                 dprint(FD_IO, "fio: recv FINISH message\n");
695                 td->done = 1;
696                 return 0;
697         }
698
699         return i;
700 }
701
702 static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u)
703 {
704         struct rdmaio_data *rd = td->io_ops->data;
705
706         fio_ro_check(td, io_u);
707
708         if (rd->io_u_queued_nr == (int)td->o.iodepth)
709                 return FIO_Q_BUSY;
710
711         rd->io_us_queued[rd->io_u_queued_nr] = io_u;
712         rd->io_u_queued_nr++;
713
714         dprint_io_u(io_u, "fio_rdmaio_queue");
715
716         return FIO_Q_QUEUED;
717 }
718
719 static void fio_rdmaio_queued(struct thread_data *td, struct io_u **io_us,
720                               unsigned int nr)
721 {
722         struct rdmaio_data *rd = td->io_ops->data;
723         struct timeval now;
724         unsigned int i;
725
726         if (!fio_fill_issue_time(td))
727                 return;
728
729         fio_gettime(&now, NULL);
730
731         for (i = 0; i < nr; i++) {
732                 struct io_u *io_u = io_us[i];
733
734                 /* queued -> flight */
735                 rd->io_us_flight[rd->io_u_flight_nr] = io_u;
736                 rd->io_u_flight_nr++;
737
738                 memcpy(&io_u->issue_time, &now, sizeof(now));
739                 io_u_queued(td, io_u);
740         }
741 }
742
743 static int fio_rdmaio_commit(struct thread_data *td)
744 {
745         struct rdmaio_data *rd = td->io_ops->data;
746         struct io_u **io_us;
747         int ret;
748
749         if (!rd->io_us_queued)
750                 return 0;
751
752         io_us = rd->io_us_queued;
753         do {
754                 /* RDMA_WRITE or RDMA_READ */
755                 if (rd->is_client) {
756                         ret = fio_rdmaio_send(td, io_us, rd->io_u_queued_nr);
757                 } else if (!rd->is_client) {
758                         ret = fio_rdmaio_recv(td, io_us, rd->io_u_queued_nr);
759                 } else
760                         ret = 0;        /* must be a SYNC */
761
762                 if (ret > 0) {
763                         fio_rdmaio_queued(td, io_us, ret);
764                         io_u_mark_submit(td, ret);
765                         rd->io_u_queued_nr -= ret;
766                         io_us += ret;
767                         ret = 0;
768                 } else
769                         break;
770         } while (rd->io_u_queued_nr);
771
772         return ret;
773 }
774
775 static int fio_rdmaio_connect(struct thread_data *td, struct fio_file *f)
776 {
777         struct rdmaio_data *rd = td->io_ops->data;
778         struct rdma_conn_param conn_param;
779         struct ibv_send_wr *bad_wr;
780
781         memset(&conn_param, 0, sizeof conn_param);
782         conn_param.responder_resources = 1;
783         conn_param.initiator_depth = 1;
784         conn_param.retry_count = 10;
785
786         if (rdma_connect(rd->cm_id, &conn_param) != 0) {
787                 log_err("fio: rdma_connect fail\n");
788                 return 1;
789         }
790
791         if (get_next_channel_event
792             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
793                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
794                 return 1;
795         }
796
797         /* send task request */
798         rd->send_buf.mode = htonl(rd->rdma_protocol);
799         rd->send_buf.nr = htonl(td->o.iodepth);
800
801         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
802                 log_err("fio: ibv_post_send fail");
803                 return 1;
804         }
805
806         rdma_poll_wait(td, IBV_WC_SEND);
807
808         /* wait for remote MR info from server side */
809         rdma_poll_wait(td, IBV_WC_RECV);
810
811         /* In SEND/RECV test, it's a good practice to setup the iodepth of
812          * of the RECV side deeper than that of the SEND side to
813          * avoid RNR (receiver not ready) error. The
814          * SEND side may send so many unsolicited message before 
815          * RECV side commits sufficient recv buffers into recv queue.
816          * This may lead to RNR error. Here, SEND side pauses for a while
817          * during which RECV side commits sufficient recv buffers.
818          */
819         usleep(500000);
820
821         return 0;
822 }
823
824 static int fio_rdmaio_accept(struct thread_data *td, struct fio_file *f)
825 {
826         struct rdmaio_data *rd = td->io_ops->data;
827         struct rdma_conn_param conn_param;
828         struct ibv_send_wr *bad_wr;
829
830         /* rdma_accept() - then wait for accept success */
831         memset(&conn_param, 0, sizeof conn_param);
832         conn_param.responder_resources = 1;
833         conn_param.initiator_depth = 1;
834
835         if (rdma_accept(rd->child_cm_id, &conn_param) != 0) {
836                 log_err("fio: rdma_accept\n");
837                 return 1;
838         }
839
840         if (get_next_channel_event
841             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
842                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
843                 return 1;
844         }
845
846         /* wait for request */
847         rdma_poll_wait(td, IBV_WC_RECV);
848
849         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
850                 log_err("fio: ibv_post_send fail");
851                 return 1;
852         }
853
854         rdma_poll_wait(td, IBV_WC_SEND);
855
856         return 0;
857 }
858
859 static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f)
860 {
861         if (td_read(td))
862                 return fio_rdmaio_accept(td, f);
863         else
864                 return fio_rdmaio_connect(td, f);
865 }
866
867 static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f)
868 {
869         struct rdmaio_data *rd = td->io_ops->data;
870         struct ibv_send_wr *bad_wr;
871
872         /* unregister rdma buffer */
873
874         /*
875          * Client sends notification to the server side
876          */
877         /* refer to: http://linux.die.net/man/7/rdma_cm */
878         if ((rd->is_client == 1) && ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE)
879                                      || (rd->rdma_protocol ==
880                                          FIO_RDMA_MEM_READ))) {
881                 if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
882                         log_err("fio: ibv_post_send fail");
883                         return 1;
884                 }
885
886                 dprint(FD_IO, "fio: close infomation sent success\n");
887                 rdma_poll_wait(td, IBV_WC_SEND);
888         }
889
890         if (rd->is_client == 1)
891                 rdma_disconnect(rd->cm_id);
892         else {
893                 rdma_disconnect(rd->child_cm_id);
894 /*        rdma_disconnect(rd->cm_id); */
895         }
896
897 /*    if (get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_DISCONNECTED) != 0)
898     {
899         log_err("fio: wait for RDMA_CM_EVENT_DISCONNECTED\n");
900         return 1;
901     }*/
902
903         ibv_destroy_cq(rd->cq);
904         ibv_destroy_qp(rd->qp);
905
906         if (rd->is_client == 1)
907                 rdma_destroy_id(rd->cm_id);
908         else {
909                 rdma_destroy_id(rd->child_cm_id);
910                 rdma_destroy_id(rd->cm_id);
911         }
912
913         ibv_destroy_comp_channel(rd->channel);
914         ibv_dealloc_pd(rd->pd);
915
916         return 0;
917 }
918
919 static int fio_rdmaio_setup_connect(struct thread_data *td, const char *host,
920                                     unsigned short port)
921 {
922         struct rdmaio_data *rd = td->io_ops->data;
923         struct ibv_recv_wr *bad_wr;
924
925         rd->addr.sin_family = AF_INET;
926         rd->addr.sin_port = htons(port);
927
928         if (inet_aton(host, &rd->addr.sin_addr) != 1) {
929                 struct hostent *hent;
930
931                 hent = gethostbyname(host);
932                 if (!hent) {
933                         td_verror(td, errno, "gethostbyname");
934                         return 1;
935                 }
936
937                 memcpy(&rd->addr.sin_addr, hent->h_addr, 4);
938         }
939
940         /* resolve route */
941         if (rdma_resolve_addr(rd->cm_id, NULL,
942                               (struct sockaddr *)&rd->addr, 2000) != 0) {
943                 log_err("fio: rdma_resolve_addr");
944                 return 1;
945         }
946
947         if (get_next_channel_event
948             (td, rd->cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED)
949             != 0) {
950                 log_err("fio: get_next_channel_event");
951                 return 1;
952         }
953
954         /* resolve route */
955         if (rdma_resolve_route(rd->cm_id, 2000) != 0) {
956                 log_err("fio: rdma_resolve_route");
957                 return 1;
958         }
959
960         if (get_next_channel_event
961             (td, rd->cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED) != 0) {
962                 log_err("fio: get_next_channel_event");
963                 return 1;
964         }
965
966         /* create qp and buffer */
967         if (fio_rdmaio_setup_qp(td) != 0)
968                 return 1;
969
970         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
971                 return 1;
972
973         /* post recv buf */
974         if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
975                 log_err("fio: ibv_post_recv fail\n");
976                 return 1;
977         }
978
979         return 0;
980 }
981
982 static int fio_rdmaio_setup_listen(struct thread_data *td, short port)
983 {
984         struct rdmaio_data *rd = td->io_ops->data;
985         struct ibv_recv_wr *bad_wr;
986
987         rd->addr.sin_family = AF_INET;
988         rd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
989         rd->addr.sin_port = htons(port);
990
991         /* rdma_listen */
992         if (rdma_bind_addr(rd->cm_id, (struct sockaddr *)&rd->addr) != 0) {
993                 log_err("fio: rdma_bind_addr fail\n");
994                 return 1;
995         }
996
997         if (rdma_listen(rd->cm_id, 3) != 0) {
998                 log_err("fio: rdma_listen fail\n");
999                 return 1;
1000         }
1001
1002         /* wait for CONNECT_REQUEST */
1003         if (get_next_channel_event
1004             (td, rd->cm_channel, RDMA_CM_EVENT_CONNECT_REQUEST) != 0) {
1005                 log_err("fio: wait for RDMA_CM_EVENT_CONNECT_REQUEST\n");
1006                 return 1;
1007         }
1008
1009         if (fio_rdmaio_setup_qp(td) != 0)
1010                 return 1;
1011
1012         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
1013                 return 1;
1014
1015         /* post recv buf */
1016         if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
1017                 log_err("fio: ibv_post_recv fail\n");
1018                 return 1;
1019         }
1020
1021         return 0;
1022 }
1023
1024 static int fio_rdmaio_init(struct thread_data *td)
1025 {
1026         struct rdmaio_data *rd = td->io_ops->data;
1027         unsigned int port;
1028         char host[64], buf[128];
1029         char *sep, *portp, *modep;
1030         int ret;
1031         struct rlimit rl;
1032
1033         if (td_rw(td)) {
1034                 log_err("fio: rdma connections must be read OR write\n");
1035                 return 1;
1036         }
1037         if (td_random(td)) {
1038                 log_err("fio: RDMA network IO can't be random\n");
1039                 return 1;
1040         }
1041
1042         /* check RLIMIT_MEMLOCK */
1043         if (getrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1044                 log_err("fio: getrlimit fail: %d(%s)\n",
1045                         errno, strerror(errno));
1046                 return 1;
1047         }
1048
1049         /* soft limit */
1050         if ((rl.rlim_cur != RLIM_INFINITY)
1051             && (rl.rlim_cur < td->orig_buffer_size)) {
1052                 log_err("fio: soft RLIMIT_MEMLOCK is: %" PRId64 "\n",
1053                         rl.rlim_cur);
1054                 log_err("fio: total block size is:    %zd\n",
1055                         td->orig_buffer_size);
1056                 /* try to set larger RLIMIT_MEMLOCK */
1057                 rl.rlim_cur = rl.rlim_max;
1058                 if (setrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1059                         log_err("fio: setrlimit fail: %d(%s)\n",
1060                                 errno, strerror(errno));
1061                         log_err("fio: you may try enlarge MEMLOCK by root\n");
1062                         log_err("# ulimit -l unlimited\n");
1063                         return 1;
1064                 }
1065         }
1066
1067         strcpy(buf, td->o.filename);
1068
1069         sep = strchr(buf, '/');
1070         if (!sep)
1071                 goto bad_host;
1072
1073         *sep = '\0';
1074         sep++;
1075         strcpy(host, buf);
1076         if (!strlen(host))
1077                 goto bad_host;
1078
1079         modep = NULL;
1080         portp = sep;
1081         sep = strchr(portp, '/');
1082         if (sep) {
1083                 *sep = '\0';
1084                 modep = sep + 1;
1085         }
1086
1087         port = strtol(portp, NULL, 10);
1088         if (!port || port > 65535)
1089                 goto bad_host;
1090
1091         if (modep) {
1092                 if (!strncmp("rdma_write", modep, strlen(modep)) ||
1093                     !strncmp("RDMA_WRITE", modep, strlen(modep)))
1094                         rd->rdma_protocol = FIO_RDMA_MEM_WRITE;
1095                 else if (!strncmp("rdma_read", modep, strlen(modep)) ||
1096                          !strncmp("RDMA_READ", modep, strlen(modep)))
1097                         rd->rdma_protocol = FIO_RDMA_MEM_READ;
1098                 else if (!strncmp("send", modep, strlen(modep)) ||
1099                          !strncmp("SEND", modep, strlen(modep)))
1100                         rd->rdma_protocol = FIO_RDMA_CHA_SEND;
1101                 else
1102                         goto bad_host;
1103         } else
1104                 rd->rdma_protocol = FIO_RDMA_MEM_WRITE;
1105
1106         rd->cq_event_num = 0;
1107
1108         rd->cm_channel = rdma_create_event_channel();
1109         if (!rd->cm_channel) {
1110                 log_err("fio: rdma_create_event_channel fail\n");
1111                 return 1;
1112         }
1113
1114         ret = rdma_create_id(rd->cm_channel, &rd->cm_id, rd, RDMA_PS_TCP);
1115         if (ret) {
1116                 log_err("fio: rdma_create_id fail\n");
1117                 return 1;
1118         }
1119
1120         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
1121             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
1122                 rd->rmt_us =
1123                         malloc(FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1124                 memset(rd->rmt_us, 0,
1125                         FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1126                 rd->rmt_nr = 0;
1127         }
1128
1129         rd->io_us_queued = malloc(td->o.iodepth * sizeof(struct io_u *));
1130         memset(rd->io_us_queued, 0, td->o.iodepth * sizeof(struct io_u *));
1131         rd->io_u_queued_nr = 0;
1132
1133         rd->io_us_flight = malloc(td->o.iodepth * sizeof(struct io_u *));
1134         memset(rd->io_us_flight, 0, td->o.iodepth * sizeof(struct io_u *));
1135         rd->io_u_flight_nr = 0;
1136
1137         rd->io_us_completed = malloc(td->o.iodepth * sizeof(struct io_u *));
1138         memset(rd->io_us_completed, 0, td->o.iodepth * sizeof(struct io_u *));
1139         rd->io_u_completed_nr = 0;
1140
1141         if (td_read(td)) {      /* READ as the server */
1142                 rd->is_client = 0;
1143                 /* server rd->rdma_buf_len will be setup after got request */
1144                 ret = fio_rdmaio_setup_listen(td, port);
1145         } else {                /* WRITE as the client */
1146                 rd->is_client = 1;
1147                 ret = fio_rdmaio_setup_connect(td, host, port);
1148         }
1149
1150         struct flist_head *entry;
1151         unsigned int max_bs;
1152         max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
1153         /* register each io_u in the free list */
1154         int i = 0;
1155         flist_for_each(entry, &td->io_u_freelist) {
1156                 struct io_u *io_u = flist_entry(entry, struct io_u, list);
1157
1158                 io_u->engine_data = malloc(sizeof(struct rdma_io_u_data));
1159                 memset(io_u->engine_data, 0, sizeof(struct rdma_io_u_data));
1160                 ((struct rdma_io_u_data *)io_u->engine_data)->wr_id = i;
1161
1162                 io_u->mr = ibv_reg_mr(rd->pd, io_u->buf, max_bs,
1163                                       IBV_ACCESS_LOCAL_WRITE |
1164                                       IBV_ACCESS_REMOTE_READ |
1165                                       IBV_ACCESS_REMOTE_WRITE);
1166                 if (io_u->mr == NULL) {
1167                         log_err("fio: ibv_reg_mr io_u failed\n");
1168                         return 1;
1169                 }
1170
1171                 rd->send_buf.rmt_us[i].buf =
1172                     htonll((uint64_t) (unsigned long)io_u->buf);
1173                 rd->send_buf.rmt_us[i].rkey = htonl(io_u->mr->rkey);
1174                 rd->send_buf.rmt_us[i].size = htonl(max_bs);
1175
1176 /*    log_info("fio: Send rkey %x addr %" PRIx64 " len %d to client\n",
1177           io_u->mr->rkey, io_u->buf, max_bs); */
1178                 i++;
1179         }
1180
1181         rd->send_buf.nr = htonl(i);
1182
1183         return ret;
1184 bad_host:
1185         log_err("fio: bad rdma host/port/protocol: %s\n", td->o.filename);
1186         return 1;
1187 }
1188
1189 static void fio_rdmaio_cleanup(struct thread_data *td)
1190 {
1191         struct rdmaio_data *rd = td->io_ops->data;
1192
1193         if (rd) {
1194 /*        if (nd->listenfd != -1)
1195             close(nd->listenfd);
1196         if (nd->pipes[0] != -1)
1197             close(nd->pipes[0]);
1198         if (nd->pipes[1] != -1)
1199             close(nd->pipes[1]);
1200 */
1201                 free(rd);
1202         }
1203 }
1204
1205 static int fio_rdmaio_setup(struct thread_data *td)
1206 {
1207         struct rdmaio_data *rd;
1208
1209         if (!td->io_ops->data) {
1210                 rd = malloc(sizeof(*rd));;
1211
1212                 memset(rd, 0, sizeof(*rd));
1213                 td->io_ops->data = rd;
1214         }
1215
1216         return 0;
1217 }
1218
1219 static struct ioengine_ops ioengine_rw = {
1220         .name           = "rdma",
1221         .version        = FIO_IOOPS_VERSION,
1222         .setup          = fio_rdmaio_setup,
1223         .init           = fio_rdmaio_init,
1224         .prep           = fio_rdmaio_prep,
1225         .queue          = fio_rdmaio_queue,
1226         .commit         = fio_rdmaio_commit,
1227         .getevents      = fio_rdmaio_getevents,
1228         .event          = fio_rdmaio_event,
1229         .cleanup        = fio_rdmaio_cleanup,
1230         .open_file      = fio_rdmaio_open_file,
1231         .close_file     = fio_rdmaio_close_file,
1232         .flags          = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO,
1233 };
1234
1235 #else /* FIO_HAVE_RDMA */
1236
1237 static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f)
1238 {
1239         return 0;
1240 }
1241
1242 static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f)
1243 {
1244         return 0;
1245 }
1246
1247 static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u)
1248 {
1249         return FIO_Q_COMPLETED;
1250 }
1251
1252 static int fio_rdmaio_init(struct thread_data fio_unused * td)
1253 {
1254         log_err("fio: rdma(librdmacm libibverbs) not available\n");
1255         log_err("     You haven't compiled rdma ioengine into fio.\n");
1256         log_err("     If you want to try rdma ioengine,\n");
1257         log_err("     make sure OFED is installed,\n");
1258         log_err("     $ ofed_info\n");
1259         log_err("     then try to make fio as follows:\n");
1260         log_err("     $ export EXTFLAGS+=\" -DFIO_HAVE_RDMA \"\n");
1261         log_err("     $ export EXTLIBS+=\" -libverbs -lrdmacm \"\n");
1262         log_err("     $ make clean && make\n");
1263         return 1;
1264 }
1265
1266 static struct ioengine_ops ioengine_rw = {
1267         .name           = "rdma",
1268         .version        = FIO_IOOPS_VERSION,
1269         .init           = fio_rdmaio_init,
1270         .queue          = fio_rdmaio_queue,
1271         .open_file      = fio_rdmaio_open_file,
1272         .close_file     = fio_rdmaio_close_file,
1273         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO,
1274 };
1275
1276 #endif
1277
1278 static void fio_init fio_rdmaio_register(void)
1279 {
1280         register_ioengine(&ioengine_rw);
1281 }
1282
1283 static void fio_exit fio_rdmaio_unregister(void)
1284 {
1285         unregister_ioengine(&ioengine_rw);
1286 }