From: ren yufei Date: Mon, 1 Aug 2011 08:01:57 +0000 (+0200) Subject: RDMA IO engine X-Git-Tag: fio-1.58~38 X-Git-Url: https://git.kernel.dk/?p=fio.git;a=commitdiff_plain;h=21b8aee865f0d3960687ce6ba7385e5977f45061;hp=b939683e7b997653a35bd7d55f733fc96030b246 RDMA IO engine I have hacked an rdma ioengine based on OFED for fio which could test both rdma memory semantic (rdma_write/rdma_read) and channel semantic (send/recv). Would you like to merge this engine into fio? notes 1) RDMA engine works in IB, iWarp and RoCE. 2) RDMA engine is disable by default. To enable it, execute the following before compile: $ export EXTFLAGS="-DFIO_HAVE_RDMA" $ export EXTLIBS="-libverbs -lrdmacm" Signed-off-by: Jens Axboe --- diff --git a/HOWTO b/HOWTO index 85704efc..5e9b390b 100644 --- a/HOWTO +++ b/HOWTO @@ -568,6 +568,11 @@ ioengine=str Defines how the job issues io to the file. The following for more info on GUASI. + rdma The RDMA I/O engine supports both RDMA + memory semantic(RDMA_WRITE/RDMA_READ) and + channel semantic(Send/Recv) in InfiniBand, RoCE + and iWarp environment. + external Prefix to specify loading an external IO engine object file. Append the engine filename, eg ioengine=external:/tmp/foo.o diff --git a/Makefile b/Makefile index a80f4ca1..e48c6e0e 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ CPPFLAGS= -D_GNU_SOURCE -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 \ $(DEBUGFLAGS) OPTFLAGS= -O2 -fno-omit-frame-pointer -g $(EXTFLAGS) CFLAGS = -std=gnu99 -Wwrite-strings -Wall $(OPTFLAGS) -LIBS = -lm +LIBS = -lm $(EXTLIBS) PROGS = fio SCRIPTS = fio_generate_plots UNAME := $(shell uname) @@ -20,7 +20,7 @@ ifeq ($(UNAME), Linux) SOURCE += diskutil.c fifo.c blktrace.c helpers.c cgroup.c trim.c \ engines/libaio.c engines/posixaio.c engines/sg.c \ engines/splice.c engines/syslet-rw.c engines/guasi.c \ - engines/binject.c profiles/tiobench.c + engines/binject.c engines/rdma.c profiles/tiobench.c LIBS += -lpthread -ldl -lrt -laio CFLAGS += -rdynamic endif diff --git a/engines/rdma.c b/engines/rdma.c new file mode 100644 index 00000000..2eeb8972 --- /dev/null +++ b/engines/rdma.c @@ -0,0 +1,1253 @@ +/* + * rdma engine + * + * RDMA IO engine using OFED library. + * Support both RDMA memory semantic and channel semantic + * in InfiniBand, RoCE and iWarp environment. + * + * This is currently disabled. To enable it, execute: + * + * $ export EXTFLAGS="-DFIO_HAVE_RDMA" + * $ export EXTLIBS="-libverbs -lrdmacm" + * + * before running make. You'll need the OFED as well: + * + * http://www.openfabrics.org/downloads/OFED/ + * + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "../fio.h" + +#ifdef FIO_HAVE_RDMA + +#include +#include + +#define FIO_RDMA_MAX_IO_DPETH 128 + +enum rdma_io_mode { + FIO_RDMA_UNKNOWN = 0, + FIO_RDMA_MEM_WRITE, + FIO_RDMA_MEM_READ, + FIO_RDMA_CHA_SEND, + FIO_RDMA_CHA_RECV +}; + +struct remote_u { + uint64_t buf; + uint32_t rkey; + uint32_t size; +}; + +struct rdma_info_blk { + uint32_t mode; /* channel semantic or memory semantic */ + uint32_t nr; /* client: io depth + server: number of records for memory semantic + */ + struct remote_u rmt_us[FIO_RDMA_MAX_IO_DPETH]; +}; + +struct rdma_io_u_data { + uint64_t wr_id; + struct ibv_send_wr sq_wr; + struct ibv_recv_wr rq_wr; + struct ibv_sge rdma_sgl; +}; + +struct rdmaio_data { + int is_client; + enum rdma_io_mode rdma_protocol; + char host[64]; + struct sockaddr_in addr; + + struct ibv_recv_wr rq_wr; + struct ibv_sge recv_sgl; + struct rdma_info_blk recv_buf; + struct ibv_mr *recv_mr; + + struct ibv_send_wr sq_wr; + struct ibv_sge send_sgl; + struct rdma_info_blk send_buf; + struct ibv_mr *send_mr; + + struct ibv_comp_channel *channel; + struct ibv_cq *cq; + struct ibv_pd *pd; + struct ibv_qp *qp; + + pthread_t cmthread; + struct rdma_event_channel *cm_channel; + struct rdma_cm_id *cm_id; + struct rdma_cm_id *child_cm_id; + + int cq_event_num; + + struct remote_u *rmt_us; + int rmt_nr; + struct io_u **io_us_queued; + int io_u_queued_nr; + struct io_u **io_us_flight; + int io_u_flight_nr; + struct io_u **io_us_completed; + int io_u_completed_nr; +}; + +static int client_recv(struct thread_data *td, struct ibv_wc *wc) +{ + struct rdmaio_data *rd = td->io_ops->data; + + if (wc->byte_len != sizeof(rd->recv_buf)) { + fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len); + return 1; + } + + /* store mr info for MEMORY semantic */ + if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) || + (rd->rdma_protocol == FIO_RDMA_MEM_READ)) { + /* struct flist_head *entry; */ + int i = 0; + + rd->rmt_nr = ntohl(rd->recv_buf.nr); + + for (i = 0; i < rd->rmt_nr; i++) { + rd->rmt_us[i].buf = ntohll(rd->recv_buf.rmt_us[i].buf); + rd->rmt_us[i].rkey = ntohl(rd->recv_buf.rmt_us[i].rkey); + rd->rmt_us[i].size = ntohl(rd->recv_buf.rmt_us[i].size); + + dprint(FD_IO, + "fio: Received rkey %x addr %" PRIx64 + " len %d from peer\n", rd->rmt_us[i].rkey, + rd->rmt_us[i].buf, rd->rmt_us[i].size); + } + } + + return 0; +} + +static int server_recv(struct thread_data *td, struct ibv_wc *wc) +{ + struct rdmaio_data *rd = td->io_ops->data; + + if (wc->wr_id == FIO_RDMA_MAX_IO_DPETH) { + rd->rdma_protocol = ntohl(rd->recv_buf.mode); + + /* CHANNEL semantic, do nothing */ + if (rd->rdma_protocol == FIO_RDMA_CHA_SEND) + rd->rdma_protocol = FIO_RDMA_CHA_RECV; + } + + return 0; +} + +static int cq_event_handler(struct thread_data *td, enum ibv_wc_opcode opcode) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct ibv_wc wc; + struct rdma_io_u_data *r_io_u_d; + int ret; + int compevnum = 0; + int i; + + while ((ret = ibv_poll_cq(rd->cq, 1, &wc)) == 1) { + ret = 0; + compevnum++; + + if (wc.status) { + log_err("fio: cq completion status %d(%s)\n", + wc.status, ibv_wc_status_str(wc.status)); + return -1; + } + + switch (wc.opcode) { + + case IBV_WC_RECV: + if (rd->is_client == 1) + client_recv(td, &wc); + else + server_recv(td, &wc); + + if (wc.wr_id == FIO_RDMA_MAX_IO_DPETH) + break; + + for (i = 0; i < rd->io_u_flight_nr; i++) { + r_io_u_d = rd->io_us_flight[i]->engine_data; + + if (wc.wr_id == r_io_u_d->rq_wr.wr_id) { + rd->io_us_flight[i]->resid = + rd->io_us_flight[i]->buflen + - wc.byte_len; + + rd->io_us_flight[i]->error = 0; + + rd->io_us_completed[rd-> + io_u_completed_nr] + = rd->io_us_flight[i]; + rd->io_u_completed_nr++; + break; + } + } + if (i == rd->io_u_flight_nr) + log_err("fio: recv wr %ld not found\n", + wc.wr_id); + else { + /* put the last one into middle of the list */ + rd->io_us_flight[i] = + rd->io_us_flight[rd->io_u_flight_nr - 1]; + rd->io_u_flight_nr--; + } + + break; + + case IBV_WC_SEND: + case IBV_WC_RDMA_WRITE: + case IBV_WC_RDMA_READ: + if (wc.wr_id == FIO_RDMA_MAX_IO_DPETH) + break; + + for (i = 0; i < rd->io_u_flight_nr; i++) { + r_io_u_d = rd->io_us_flight[i]->engine_data; + + if (wc.wr_id == r_io_u_d->sq_wr.wr_id) { + rd->io_us_completed[rd-> + io_u_completed_nr] + = rd->io_us_flight[i]; + rd->io_u_completed_nr++; + break; + } + } + if (i == rd->io_u_flight_nr) + log_err("fio: send wr %ld not found\n", + wc.wr_id); + else { + /* put the last one into middle of the list */ + rd->io_us_flight[i] = + rd->io_us_flight[rd->io_u_flight_nr - 1]; + rd->io_u_flight_nr--; + } + + break; + + default: + log_info("fio: unknown completion event %d\n", + wc.opcode); + return -1; + } + rd->cq_event_num++; + } + if (ret) { + log_err("fio: poll error %d\n", ret); + return 1; + } + + return compevnum; +} + +/* + * Return -1 for error and 'nr events' for a positive number + * of events + */ +static int rdma_poll_wait(struct thread_data *td, enum ibv_wc_opcode opcode) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct ibv_cq *ev_cq; + void *ev_ctx; + int ret; + + if (rd->cq_event_num > 0) { /* previous left */ + rd->cq_event_num--; + return 0; + } + +again: + if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) { + log_err("fio: Failed to get cq event!\n"); + return -1; + } + if (ev_cq != rd->cq) { + log_err("fio: Unknown CQ!\n"); + return -1; + } + if (ibv_req_notify_cq(rd->cq, 0) != 0) { + log_err("fio: Failed to set notify!\n"); + return -1; + } + + ret = cq_event_handler(td, opcode); + if (ret < 1) + goto again; + + ibv_ack_cq_events(rd->cq, ret); + + rd->cq_event_num--; + + return ret; +} + +static int fio_rdmaio_setup_qp(struct thread_data *td) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct ibv_qp_init_attr init_attr; + int qp_depth = td->o.iodepth * 2; /* 2 times of io depth */ + + if (rd->is_client == 0) + rd->pd = ibv_alloc_pd(rd->child_cm_id->verbs); + else + rd->pd = ibv_alloc_pd(rd->cm_id->verbs); + if (rd->pd == NULL) { + log_err("fio: ibv_alloc_pd fail\n"); + return 1; + } + + if (rd->is_client == 0) + rd->channel = ibv_create_comp_channel(rd->child_cm_id->verbs); + else + rd->channel = ibv_create_comp_channel(rd->cm_id->verbs); + if (rd->channel == NULL) { + log_err("fio: ibv_create_comp_channel fail\n"); + goto err1; + } + + if (qp_depth < 16) + qp_depth = 16; + + if (rd->is_client == 0) + rd->cq = ibv_create_cq(rd->child_cm_id->verbs, + qp_depth, rd, rd->channel, 0); + else + rd->cq = ibv_create_cq(rd->cm_id->verbs, + qp_depth, rd, rd->channel, 0); + if (rd->cq == NULL) { + log_err("fio: ibv_create_cq failed\n"); + goto err2; + } + + if (ibv_req_notify_cq(rd->cq, 0) != 0) { + log_err("fio: ibv_create_cq failed\n"); + goto err3; + } + + /* create queue pair */ + memset(&init_attr, 0, sizeof(init_attr)); + init_attr.cap.max_send_wr = qp_depth; + init_attr.cap.max_recv_wr = qp_depth; + init_attr.cap.max_recv_sge = 1; + init_attr.cap.max_send_sge = 1; + init_attr.qp_type = IBV_QPT_RC; + init_attr.send_cq = rd->cq; + init_attr.recv_cq = rd->cq; + + if (rd->is_client == 0) { + if (rdma_create_qp(rd->child_cm_id, rd->pd, &init_attr) != 0) { + log_err("fio: rdma_create_qp failed\n"); + goto err3; + } + rd->qp = rd->child_cm_id->qp; + } else { + if (rdma_create_qp(rd->cm_id, rd->pd, &init_attr) != 0) { + log_err("fio: rdma_create_qp failed\n"); + goto err3; + } + rd->qp = rd->cm_id->qp; + } + + return 0; + +err3: + ibv_destroy_cq(rd->cq); +err2: + ibv_destroy_comp_channel(rd->channel); +err1: + ibv_dealloc_pd(rd->pd); + + return 1; +} + +static int fio_rdmaio_setup_control_msg_buffers(struct thread_data *td) +{ + struct rdmaio_data *rd = td->io_ops->data; + + rd->recv_mr = ibv_reg_mr(rd->pd, &rd->recv_buf, sizeof(rd->recv_buf), + IBV_ACCESS_LOCAL_WRITE); + if (rd->recv_mr == NULL) { + log_err("fio: recv_buf reg_mr failed\n"); + return 1; + } + + rd->send_mr = ibv_reg_mr(rd->pd, &rd->send_buf, sizeof(rd->send_buf), + 0); + if (rd->send_mr == NULL) { + log_err("fio: send_buf reg_mr failed\n"); + ibv_dereg_mr(rd->recv_mr); + return 1; + } + + /* setup work request */ + /* recv wq */ + rd->recv_sgl.addr = (uint64_t) (unsigned long)&rd->recv_buf; + rd->recv_sgl.length = sizeof rd->recv_buf; + rd->recv_sgl.lkey = rd->recv_mr->lkey; + rd->rq_wr.sg_list = &rd->recv_sgl; + rd->rq_wr.num_sge = 1; + rd->rq_wr.wr_id = FIO_RDMA_MAX_IO_DPETH; + + /* send wq */ + rd->send_sgl.addr = (uint64_t) (unsigned long)&rd->send_buf; + rd->send_sgl.length = sizeof rd->send_buf; + rd->send_sgl.lkey = rd->send_mr->lkey; + + rd->sq_wr.opcode = IBV_WR_SEND; + rd->sq_wr.send_flags = IBV_SEND_SIGNALED; + rd->sq_wr.sg_list = &rd->send_sgl; + rd->sq_wr.num_sge = 1; + rd->sq_wr.wr_id = FIO_RDMA_MAX_IO_DPETH; + + return 0; +} + +static int get_next_channel_event(struct thread_data *td, + struct rdma_event_channel *channel, + enum rdma_cm_event_type wait_event) +{ + struct rdmaio_data *rd = td->io_ops->data; + + int ret; + struct rdma_cm_event *event; + + ret = rdma_get_cm_event(channel, &event); + if (ret) { + log_err("fio: rdma_get_cm_event"); + return 1; + } + + if (event->event != wait_event) { + log_err("fio: event is %s instead of %s\n", + rdma_event_str(event->event), + rdma_event_str(wait_event)); + return 1; + } + + switch (event->event) { + case RDMA_CM_EVENT_CONNECT_REQUEST: + rd->child_cm_id = event->id; + break; + default: + break; + } + + rdma_ack_cm_event(event); + + return 0; +} + +static int fio_rdmaio_prep(struct thread_data *td, struct io_u *io_u) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct rdma_io_u_data *r_io_u_d; + + r_io_u_d = io_u->engine_data; + + switch (rd->rdma_protocol) { + case FIO_RDMA_MEM_WRITE: + case FIO_RDMA_MEM_READ: + r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf; + r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey; + r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id; + r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED; + r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl; + r_io_u_d->sq_wr.num_sge = 1; + break; + case FIO_RDMA_CHA_SEND: + r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf; + r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey; + r_io_u_d->rdma_sgl.length = io_u->buflen; + r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id; + r_io_u_d->sq_wr.opcode = IBV_WR_SEND; + r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED; + r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl; + r_io_u_d->sq_wr.num_sge = 1; + break; + case FIO_RDMA_CHA_RECV: + r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf; + r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey; + r_io_u_d->rdma_sgl.length = io_u->buflen; + r_io_u_d->rq_wr.wr_id = r_io_u_d->wr_id; + r_io_u_d->rq_wr.sg_list = &r_io_u_d->rdma_sgl; + r_io_u_d->rq_wr.num_sge = 1; + break; + default: + log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol); + break; + } + + return 0; +} + +static struct io_u *fio_rdmaio_event(struct thread_data *td, int event) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct io_u *io_u; + int i; + + io_u = rd->io_us_completed[0]; + for (i = 0; i < rd->io_u_completed_nr - 1; i++) { + rd->io_us_completed[i] = rd->io_us_completed[i + 1]; + } + rd->io_u_completed_nr--; + + dprint_io_u(io_u, "fio_rdmaio_event"); + + return io_u; +} + +static int fio_rdmaio_getevents(struct thread_data *td, unsigned int min, + unsigned int max, struct timespec *t) +{ + struct rdmaio_data *rd = td->io_ops->data; + int r; + enum ibv_wc_opcode comp_opcode; + comp_opcode = IBV_WC_RDMA_WRITE; + struct ibv_cq *ev_cq; + void *ev_ctx; + int ret; + + r = 0; + + switch (rd->rdma_protocol) { + case FIO_RDMA_MEM_WRITE: + comp_opcode = IBV_WC_RDMA_WRITE; + break; + case FIO_RDMA_MEM_READ: + comp_opcode = IBV_WC_RDMA_READ; + break; + case FIO_RDMA_CHA_SEND: + comp_opcode = IBV_WC_SEND; + break; + case FIO_RDMA_CHA_RECV: + comp_opcode = IBV_WC_RECV; + break; + default: + log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol); + break; + } + + if (rd->cq_event_num > 0) { /* previous left */ + rd->cq_event_num--; + return 0; + } + +again: + if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) { + log_err("fio: Failed to get cq event!\n"); + return -1; + } + if (ev_cq != rd->cq) { + log_err("fio: Unknown CQ!\n"); + return -1; + } + if (ibv_req_notify_cq(rd->cq, 0) != 0) { + log_err("fio: Failed to set notify!\n"); + return -1; + } + + ret = cq_event_handler(td, comp_opcode); + if (ret < 1) + goto again; + + ibv_ack_cq_events(rd->cq, ret); + + r += ret; + if (r < min) + goto again; + + rd->cq_event_num -= r; + + return r; +} + +static int fio_rdmaio_send(struct thread_data *td, struct io_u **io_us, + unsigned int nr) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct ibv_send_wr *bad_wr; + enum ibv_wc_opcode comp_opcode; + comp_opcode = IBV_WC_RDMA_WRITE; + int i, index; + struct rdma_io_u_data *r_io_u_d; + + r_io_u_d = NULL; + + for (i = 0; i < nr; i++) { + /* RDMA_WRITE or RDMA_READ */ + switch (rd->rdma_protocol) { + case FIO_RDMA_MEM_WRITE: + /* compose work request */ + r_io_u_d = io_us[i]->engine_data; + index = rand() % rd->rmt_nr; + r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_WRITE; + r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey; + r_io_u_d->sq_wr.wr.rdma.remote_addr = + rd->rmt_us[index].buf; + r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen; + break; + case FIO_RDMA_MEM_READ: + /* compose work request */ + r_io_u_d = io_us[i]->engine_data; + index = rand() % rd->rmt_nr; + r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_READ; + r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey; + r_io_u_d->sq_wr.wr.rdma.remote_addr = + rd->rmt_us[index].buf; + r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen; + break; + case FIO_RDMA_CHA_SEND: + r_io_u_d = io_us[i]->engine_data; + r_io_u_d->sq_wr.opcode = IBV_WR_SEND; + r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED; + break; + default: + log_err("fio: unknown rdma protocol - %d\n", + rd->rdma_protocol); + break; + } + + if (ibv_post_send(rd->qp, &r_io_u_d->sq_wr, &bad_wr) != 0) { + log_err("fio: ibv_post_send fail\n"); + return -1; + } + + dprint_io_u(io_us[i], "fio_rdmaio_send"); + } + + /* wait for completion + rdma_poll_wait(td, comp_opcode); */ + + return i; +} + +static int fio_rdmaio_recv(struct thread_data *td, struct io_u **io_us, + unsigned int nr) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct ibv_recv_wr *bad_wr; + struct rdma_io_u_data *r_io_u_d; + int i; + + i = 0; + if (rd->rdma_protocol == FIO_RDMA_CHA_RECV) { + /* post io_u into recv queue */ + for (i = 0; i < nr; i++) { + r_io_u_d = io_us[i]->engine_data; + if (ibv_post_recv(rd->qp, &r_io_u_d->rq_wr, &bad_wr) != + 0) { + log_err("fio: ibv_post_recv fail\n"); + return 1; + } + } + } else if ((rd->rdma_protocol == FIO_RDMA_MEM_READ) + || (rd->rdma_protocol == FIO_RDMA_MEM_WRITE)) { + /* re-post the rq_wr */ + if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) { + log_err("fio: ibv_post_recv fail\n"); + return 1; + } + + rdma_poll_wait(td, IBV_WC_RECV); + + dprint(FD_IO, "fio: recv FINISH message\n"); + exit(0); + } + + return i; +} + +static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u) +{ + struct rdmaio_data *rd = td->io_ops->data; + + fio_ro_check(td, io_u); + + if (rd->io_u_queued_nr == (int)td->o.iodepth) + return FIO_Q_BUSY; + + rd->io_us_queued[rd->io_u_queued_nr] = io_u; + rd->io_u_queued_nr++; + + dprint_io_u(io_u, "fio_rdmaio_queue"); + + return FIO_Q_QUEUED; +} + +static void fio_rdmaio_queued(struct thread_data *td, struct io_u **io_us, + unsigned int nr) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct timeval now; + unsigned int i; + + if (!fio_fill_issue_time(td)) + return; + + fio_gettime(&now, NULL); + + for (i = 0; i < nr; i++) { + struct io_u *io_u = io_us[i]; + + /* queued -> flight */ + rd->io_us_flight[rd->io_u_flight_nr] = io_u; + rd->io_u_flight_nr++; + + memcpy(&io_u->issue_time, &now, sizeof(now)); + io_u_queued(td, io_u); + } +} + +static int fio_rdmaio_commit(struct thread_data *td) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct io_u **io_us; + int ret; + + if (!rd->io_us_queued) + return 0; + + io_us = rd->io_us_queued; + do { + /* RDMA_WRITE or RDMA_READ */ + if (rd->is_client) { + ret = fio_rdmaio_send(td, io_us, rd->io_u_queued_nr); + } else if (!rd->is_client) { + ret = fio_rdmaio_recv(td, io_us, rd->io_u_queued_nr); + } else + ret = 0; /* must be a SYNC */ + + if (ret > 0) { + fio_rdmaio_queued(td, io_us, ret); + io_u_mark_submit(td, ret); + rd->io_u_queued_nr -= ret; + io_us += ret; + ret = 0; + } else + break; + } while (rd->io_u_queued_nr); + + return ret; +} + +static int fio_rdmaio_connect(struct thread_data *td, struct fio_file *f) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct rdma_conn_param conn_param; + struct ibv_send_wr *bad_wr; + + memset(&conn_param, 0, sizeof conn_param); + conn_param.responder_resources = 1; + conn_param.initiator_depth = 1; + conn_param.retry_count = 10; + + if (rdma_connect(rd->cm_id, &conn_param) != 0) { + log_err("fio: rdma_connect fail\n"); + return 1; + } + + if (get_next_channel_event + (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) { + log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n"); + return 1; + } + + /* send task request */ + rd->send_buf.mode = htonl(rd->rdma_protocol); + rd->send_buf.nr = htonl(td->o.iodepth); + + if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) { + log_err("fio: ibv_post_send fail"); + return 1; + } + + rdma_poll_wait(td, IBV_WC_SEND); + + /* wait for remote MR info from server side */ + rdma_poll_wait(td, IBV_WC_RECV); + + return 0; +} + +static int fio_rdmaio_accept(struct thread_data *td, struct fio_file *f) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct rdma_conn_param conn_param; + struct ibv_send_wr *bad_wr; + + /* rdma_accept() - then wait for accept success */ + memset(&conn_param, 0, sizeof conn_param); + conn_param.responder_resources = 1; + conn_param.initiator_depth = 1; + + if (rdma_accept(rd->child_cm_id, &conn_param) != 0) { + log_err("fio: rdma_accept\n"); + return 1; + } + + if (get_next_channel_event + (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) { + log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n"); + return 1; + } + + /* wait for request */ + rdma_poll_wait(td, IBV_WC_RECV); + + if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) { + log_err("fio: ibv_post_send fail"); + return 1; + } + + rdma_poll_wait(td, IBV_WC_SEND); + + return 0; +} + +static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f) +{ + if (td_read(td)) + return fio_rdmaio_accept(td, f); + else + return fio_rdmaio_connect(td, f); +} + +static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct ibv_send_wr *bad_wr; + + /* unregister rdma buffer */ + + /* + * Client sends notification to the server side + */ + /* refer to: http://linux.die.net/man/7/rdma_cm */ + if ((rd->is_client == 1) && ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) + || (rd->rdma_protocol == + FIO_RDMA_MEM_READ))) { + if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) { + log_err("fio: ibv_post_send fail"); + return 1; + } + + dprint(FD_IO, "fio: close infomation sent success\n"); + rdma_poll_wait(td, IBV_WC_SEND); + } + + if (rd->is_client == 1) + rdma_disconnect(rd->cm_id); + else { + rdma_disconnect(rd->child_cm_id); +/* rdma_disconnect(rd->cm_id); */ + } + +/* if (get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_DISCONNECTED) != 0) + { + log_err("fio: wait for RDMA_CM_EVENT_DISCONNECTED\n"); + return 1; + }*/ + + ibv_destroy_qp(rd->qp); + ibv_destroy_cq(rd->cq); + + if (rd->is_client == 1) + rdma_destroy_id(rd->cm_id); + else { + rdma_destroy_id(rd->child_cm_id); + rdma_destroy_id(rd->cm_id); + } + + ibv_destroy_comp_channel(rd->channel); + ibv_dealloc_pd(rd->pd); + + return 0; +} + +static int fio_rdmaio_setup_connect(struct thread_data *td, const char *host, + unsigned short port) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct ibv_recv_wr *bad_wr; + + rd->addr.sin_family = AF_INET; + rd->addr.sin_port = htons(port); + + if (inet_aton(host, &rd->addr.sin_addr) != 1) { + struct hostent *hent; + + hent = gethostbyname(host); + if (!hent) { + td_verror(td, errno, "gethostbyname"); + return 1; + } + + memcpy(&rd->addr.sin_addr, hent->h_addr, 4); + } + + /* resolve route */ + if (rdma_resolve_addr(rd->cm_id, NULL, + (struct sockaddr *)&rd->addr, 2000) != 0) { + log_err("fio: rdma_resolve_addr"); + return 1; + } + + if (get_next_channel_event + (td, rd->cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED) + != 0) { + log_err("fio: get_next_channel_event"); + return 1; + } + + /* resolve route */ + if (rdma_resolve_route(rd->cm_id, 2000) != 0) { + log_err("fio: rdma_resolve_route"); + return 1; + } + + if (get_next_channel_event + (td, rd->cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED) != 0) { + log_err("fio: get_next_channel_event"); + return 1; + } + + /* create qp and buffer */ + if (fio_rdmaio_setup_qp(td) != 0) + return 1; + + if (fio_rdmaio_setup_control_msg_buffers(td) != 0) + return 1; + + /* post recv buf */ + if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) { + log_err("fio: ibv_post_recv fail\n"); + return 1; + } + + return 0; +} + +static int fio_rdmaio_setup_listen(struct thread_data *td, short port) +{ + struct rdmaio_data *rd = td->io_ops->data; + struct ibv_recv_wr *bad_wr; + + rd->addr.sin_family = AF_INET; + rd->addr.sin_addr.s_addr = htonl(INADDR_ANY); + rd->addr.sin_port = htons(port); + + /* rdma_listen */ + if (rdma_bind_addr(rd->cm_id, (struct sockaddr *)&rd->addr) != 0) { + log_err("fio: rdma_bind_addr fail\n"); + return 1; + } + + if (rdma_listen(rd->cm_id, 3) != 0) { + log_err("fio: rdma_listen fail\n"); + return 1; + } + + /* wait for CONNECT_REQUEST */ + if (get_next_channel_event + (td, rd->cm_channel, RDMA_CM_EVENT_CONNECT_REQUEST) != 0) { + log_err("fio: wait for RDMA_CM_EVENT_CONNECT_REQUEST\n"); + return 1; + } + + if (fio_rdmaio_setup_qp(td) != 0) + return 1; + + if (fio_rdmaio_setup_control_msg_buffers(td) != 0) + return 1; + + /* post recv buf */ + if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) { + log_err("fio: ibv_post_recv fail\n"); + return 1; + } + + return 0; +} + +static int fio_rdmaio_init(struct thread_data *td) +{ + struct rdmaio_data *rd = td->io_ops->data; + unsigned int port; + char host[64], buf[128]; + char *sep, *portp, *modep; + int ret; + struct rlimit rl; + + if (td_rw(td)) { + log_err("fio: rdma connections must be read OR write\n"); + return 1; + } + if (td_random(td)) { + log_err("fio: RDMA network IO can't be random\n"); + return 1; + } + + /* check RLIMIT_MEMLOCK */ + if (getrlimit(RLIMIT_MEMLOCK, &rl) != 0) { + log_err("fio: getrlimit fail: %d(%s)\n", + errno, strerror(errno)); + return 1; + } + + /* soft limit */ + if ((rl.rlim_cur != RLIM_INFINITY) + && (rl.rlim_cur < td->orig_buffer_size)) { + log_err("fio: soft RLIMIT_MEMLOCK is: %ld\n", rl.rlim_cur); + log_err("fio: total block size is: %ld\n", + td->orig_buffer_size); + /* try to set larger RLIMIT_MEMLOCK */ + rl.rlim_cur = rl.rlim_max; + if (setrlimit(RLIMIT_MEMLOCK, &rl) != 0) { + log_err("fio: setrlimit fail: %d(%s)\n", + errno, strerror(errno)); + log_err("fio: you may try enlarge MEMLOCK by root\n"); + log_err("# ulimit -l unlimited\n"); + return 1; + } + } + + strcpy(buf, td->o.filename); + + sep = strchr(buf, '/'); + if (!sep) + goto bad_host; + + *sep = '\0'; + sep++; + strcpy(host, buf); + if (!strlen(host)) + goto bad_host; + + modep = NULL; + portp = sep; + sep = strchr(portp, '/'); + if (sep) { + *sep = '\0'; + modep = sep + 1; + } + + port = strtol(portp, NULL, 10); + if (!port || port > 65535) + goto bad_host; + + if (modep) { + if (!strncmp("rdma_write", modep, strlen(modep)) || + !strncmp("RDMA_WRITE", modep, strlen(modep))) + rd->rdma_protocol = FIO_RDMA_MEM_WRITE; + else if (!strncmp("rdma_read", modep, strlen(modep)) || + !strncmp("RDMA_READ", modep, strlen(modep))) + rd->rdma_protocol = FIO_RDMA_MEM_READ; + else if (!strncmp("send", modep, strlen(modep)) || + !strncmp("SEND", modep, strlen(modep))) + rd->rdma_protocol = FIO_RDMA_CHA_SEND; + else + goto bad_host; + } else + rd->rdma_protocol = FIO_RDMA_MEM_WRITE; + + rd->cq_event_num = 0; + + rd->cm_channel = rdma_create_event_channel(); + if (!rd->cm_channel) { + log_err("fio: rdma_create_event_channel fail\n"); + return 1; + } + + ret = rdma_create_id(rd->cm_channel, &rd->cm_id, rd, RDMA_PS_TCP); + if (ret) { + log_err("fio: rdma_create_id fail\n"); + return 1; + } + + if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) || + (rd->rdma_protocol == FIO_RDMA_MEM_READ)) { + rd->rmt_us = + malloc(FIO_RDMA_MAX_IO_DPETH * sizeof(struct remote_u)); + memset(rd->rmt_us, 0, + FIO_RDMA_MAX_IO_DPETH * sizeof(struct remote_u)); + rd->rmt_nr = 0; + } + + rd->io_us_queued = malloc(td->o.iodepth * sizeof(struct io_u *)); + memset(rd->io_us_queued, 0, td->o.iodepth * sizeof(struct io_u *)); + rd->io_u_queued_nr = 0; + + rd->io_us_flight = malloc(td->o.iodepth * sizeof(struct io_u *)); + memset(rd->io_us_flight, 0, td->o.iodepth * sizeof(struct io_u *)); + rd->io_u_flight_nr = 0; + + rd->io_us_completed = malloc(td->o.iodepth * sizeof(struct io_u *)); + memset(rd->io_us_completed, 0, td->o.iodepth * sizeof(struct io_u *)); + rd->io_u_completed_nr = 0; + + if (td_read(td)) { /* READ as the server */ + rd->is_client = 0; + /* server rd->rdma_buf_len will be setup after got request */ + ret = fio_rdmaio_setup_listen(td, port); + } else { /* WRITE as the client */ + rd->is_client = 1; + ret = fio_rdmaio_setup_connect(td, host, port); + } + + struct flist_head *entry; + unsigned int max_bs; + max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]); + /* register each io_u in the free list */ + int i = 0; + flist_for_each(entry, &td->io_u_freelist) { + struct io_u *io_u = flist_entry(entry, struct io_u, list); + + io_u->engine_data = malloc(sizeof(struct rdma_io_u_data)); + memset(io_u->engine_data, 0, sizeof(struct rdma_io_u_data)); + ((struct rdma_io_u_data *)io_u->engine_data)->wr_id = i; + + io_u->mr = ibv_reg_mr(rd->pd, io_u->buf, max_bs, + IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE); + if (io_u->mr == NULL) { + log_err("fio: ibv_reg_mr io_u failed\n"); + return 1; + } + + rd->send_buf.rmt_us[i].buf = + htonll((uint64_t) (unsigned long)io_u->buf); + rd->send_buf.rmt_us[i].rkey = htonl(io_u->mr->rkey); + rd->send_buf.rmt_us[i].size = htonl(max_bs); + +/* log_info("fio: Send rkey %x addr %" PRIx64 " len %d to client\n", + io_u->mr->rkey, io_u->buf, max_bs); */ + i++; + } + + rd->send_buf.nr = htonl(i); + + return ret; +bad_host: + log_err("fio: bad rdma host/port/protocol: %s\n", td->o.filename); + return 1; +} + +static void fio_rdmaio_cleanup(struct thread_data *td) +{ + struct rdmaio_data *rd = td->io_ops->data; + + if (rd) { +/* if (nd->listenfd != -1) + close(nd->listenfd); + if (nd->pipes[0] != -1) + close(nd->pipes[0]); + if (nd->pipes[1] != -1) + close(nd->pipes[1]); +*/ + free(rd); + } +} + +static int fio_rdmaio_setup(struct thread_data *td) +{ + struct rdmaio_data *rd; + + if (!td->io_ops->data) { + rd = malloc(sizeof(*rd));; + + memset(rd, 0, sizeof(*rd)); + td->io_ops->data = rd; + } + + return 0; +} + +static struct ioengine_ops ioengine_rw = { + .name = "rdma", + .version = FIO_IOOPS_VERSION, + .setup = fio_rdmaio_setup, + .init = fio_rdmaio_init, + .prep = fio_rdmaio_prep, + .queue = fio_rdmaio_queue, + .commit = fio_rdmaio_commit, + .getevents = fio_rdmaio_getevents, + .event = fio_rdmaio_event, + .cleanup = fio_rdmaio_cleanup, + .open_file = fio_rdmaio_open_file, + .close_file = fio_rdmaio_close_file, + .flags = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO, +}; + +#else /* FIO_HAVE_RDMA */ + +static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f) +{ + return 0; +} + +static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f) +{ + return 0; +} + +static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u) +{ + return FIO_Q_COMPLETED; +} + +static int fio_rdmaio_init(struct thread_data fio_unused * td) +{ + log_err("fio: rdma(librdmacm libibverbs) not available\n"); + log_err(" You haven't compiled rdma ioengine into fio.\n"); + log_err(" If you want to try rdma ioengine,\n"); + log_err(" make sure OFED is installed,\n"); + log_err(" $ ofed_info\n"); + log_err(" then try to make fio as follows:\n"); + log_err(" $ export EXTFLAGS=\"-DFIO_HAVE_RDMA\"\n"); + log_err(" $ export EXTLIBS=\"-libverbs -lrdmacm\"\n"); + log_err(" $ make clean && make\n"); + return 1; +} + +static struct ioengine_ops ioengine_rw = { + .name = "rdma", + .version = FIO_IOOPS_VERSION, + .init = fio_rdmaio_init, + .queue = fio_rdmaio_queue, + .open_file = fio_rdmaio_open_file, + .close_file = fio_rdmaio_close_file, + .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO, +}; + +#endif + +static void fio_init fio_rdmaio_register(void) +{ + register_ioengine(&ioengine_rw); +} + +static void fio_exit fio_rdmaio_unregister(void) +{ + unregister_ioengine(&ioengine_rw); +} diff --git a/examples/rdmaio-client b/examples/rdmaio-client new file mode 100644 index 00000000..7c660c9f --- /dev/null +++ b/examples/rdmaio-client @@ -0,0 +1,11 @@ +# Example rdma client job +[global] +ioengine=rdma +filename=[ip_addr]/[port]/[RDMA_WRITE/RDMA_READ/SEND] +bs=1m +size=100g + +[sender] +rw=write +iodepth=1 +iodepth_batch_complete=1 \ No newline at end of file diff --git a/examples/rdmaio-server b/examples/rdmaio-server new file mode 100644 index 00000000..93488591 --- /dev/null +++ b/examples/rdmaio-server @@ -0,0 +1,10 @@ +# Example rdma server job +[global] +ioengine=rdma +filename=[ip_addr]/[port] +bs=1m +size=100g + +[receiver] +rw=read +iodepth=16 \ No newline at end of file diff --git a/fio.1 b/fio.1 index b7046d5a..e48a16a1 100644 --- a/fio.1 +++ b/fio.1 @@ -410,6 +410,10 @@ approach to asycnronous I/O. .br See . .TP +.B rdma +The RDMA I/O engine supports both RDMA memory semantic(RDMA_WRITE/RDMA_READ) +and channel semantic(Send/Recv) in InfiniBand, RoCE and iWarp environment. +.TP .B external Loads an external I/O engine object file. Append the engine filename as `:\fIenginepath\fR'. diff --git a/ioengine.h b/ioengine.h index c56bd505..75c2c1ad 100644 --- a/ioengine.h +++ b/ioengine.h @@ -35,6 +35,9 @@ struct io_u { #endif #ifdef FIO_HAVE_BINJECT struct b_user_cmd buc; +#endif +#ifdef FIO_HAVE_RDMA + struct ibv_mr *mr; #endif void *mmap_data; }; diff --git a/options.c b/options.c index 74c64780..ffe54709 100644 --- a/options.c +++ b/options.c @@ -1002,6 +1002,11 @@ static struct fio_option options[FIO_MAX_OPTS] = { { .ival = "binject", .help = "binject direct inject block engine", }, +#endif +#ifdef FIO_HAVE_RDMA + { .ival = "rdma", + .help = "RDMA IO engine", + }, #endif { .ival = "external", .help = "Load external engine (append name)",