From: Jens Axboe Date: Wed, 11 Aug 2021 22:54:23 +0000 (-0600) Subject: t/io_uring: allow multiple IO threads X-Git-Tag: fio-3.28~25 X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=543196617ce45b6a04fb039a3b9c6d06c9b58309;p=fio.git t/io_uring: allow multiple IO threads If you do: t/io_uring -n2 /dev/dev1 /dev/dev2 then t/io_uring will create two IO threads, and each one will get a file/device assigned. In the above example, thread 1 will run on dev1, thread 2 on dev2. Note that for now, you'll need at least as many files as threads. Adding support for adding the same file set over the specified threads (if we have less files than threads) is left as an exercise for the reader. You know where to send the patches. Signed-off-by: Jens Axboe --- diff --git a/t/io_uring.c b/t/io_uring.c index ff4c7a7c..538cc7d4 100644 --- a/t/io_uring.c +++ b/t/io_uring.c @@ -62,6 +62,7 @@ struct file { struct submitter { pthread_t thread; int ring_fd; + int index; struct io_sq_ring sq_ring; struct io_uring_sqe *sqes; struct io_cq_ring cq_ring; @@ -93,6 +94,7 @@ static int buffered = 0; /* use buffered IO, not O_DIRECT */ static int sq_thread_poll = 0; /* use kernel submission/poller thread */ static int sq_thread_cpu = -1; /* pin above thread to this CPU */ static int do_nop = 0; /* no-op SQ ring commands */ +static int nthreads = 1; static int vectored = 1; @@ -404,10 +406,25 @@ submit: return NULL; } +static struct submitter *get_submitter(int offset) +{ + void *ret; + + ret = submitter; + if (offset) + ret += offset * (sizeof(*submitter) + depth * sizeof(struct iovec)); + return ret; +} + static void sig_int(int sig) { + int j; + printf("Exiting on signal %d\n", sig); - submitter->finish = 1; + for (j = 0; j < nthreads; j++) { + struct submitter *s = get_submitter(j); + s->finish = 1; + } finish = 1; } @@ -498,19 +515,22 @@ static int setup_ring(struct submitter *s) static void file_depths(char *buf) { - struct submitter *s = submitter; char *p; - int i; + int i, j; buf[0] = '\0'; p = buf; - for (i = 0; i < s->nr_files; i++) { - struct file *f = &s->files[i]; + for (j = 0; j < nthreads; j++) { + struct submitter *s = get_submitter(j); - if (i + 1 == s->nr_files) - p += sprintf(p, "%d", f->pending_ios); - else - p += sprintf(p, "%d, ", f->pending_ios); + for (i = 0; i < s->nr_files; i++) { + struct file *f = &s->files[i]; + + if (i + 1 == s->nr_files) + p += sprintf(p, "%d", f->pending_ios); + else + p += sprintf(p, "%d, ", f->pending_ios); + } } } @@ -530,7 +550,7 @@ int main(int argc, char *argv[]) { struct submitter *s; unsigned long done, calls, reap; - int err, i, flags, fd, opt; + int err, i, j, flags, fd, opt; char *fdepths; void *ret; @@ -539,7 +559,7 @@ int main(int argc, char *argv[]) return 1; } - while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:h?")) != -1) { + while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:n:h?")) != -1) { switch (opt) { case 'd': depth = atoi(optarg); @@ -562,6 +582,9 @@ int main(int argc, char *argv[]) case 'F': register_files = !!atoi(optarg); break; + case 'n': + nthreads = atoi(optarg); + break; case 'h': case '?': default: @@ -570,18 +593,25 @@ int main(int argc, char *argv[]) } } - submitter = malloc(sizeof(*submitter) + depth * sizeof(struct iovec)); - memset(submitter, 0, sizeof(*submitter) + depth * sizeof(struct iovec)); - s = submitter; + submitter = calloc(nthreads, sizeof(*submitter) + + depth * sizeof(struct iovec)); + for (j = 0; j < nthreads; j++) { + s = get_submitter(j); + s->index = j; + s->done = s->calls = s->reaps = 0; + } flags = O_RDONLY | O_NOATIME; if (!buffered) flags |= O_DIRECT; + j = 0; i = optind; + printf("i %d, argc %d\n", i, argc); while (!do_nop && i < argc) { struct file *f; + s = get_submitter(j); if (s->nr_files == MAX_FDS) { printf("Max number of files (%d) reached\n", MAX_FDS); break; @@ -604,9 +634,11 @@ int main(int argc, char *argv[]) } f->max_blocks--; - printf("Added file %s\n", argv[i]); + printf("Added file %s (submitter %d)\n", argv[i], s->index); s->nr_files++; i++; + if (++j >= nthreads) + j = 0; } if (fixedbufs) { @@ -622,28 +654,39 @@ int main(int argc, char *argv[]) arm_sig_int(); - for (i = 0; i < depth; i++) { - void *buf; + for (j = 0; j < nthreads; j++) { + s = get_submitter(j); + for (i = 0; i < depth; i++) { + void *buf; - if (posix_memalign(&buf, bs, bs)) { - printf("failed alloc\n"); - return 1; + if (posix_memalign(&buf, bs, bs)) { + printf("failed alloc\n"); + return 1; + } + s->iovecs[i].iov_base = buf; + s->iovecs[i].iov_len = bs; } - s->iovecs[i].iov_base = buf; - s->iovecs[i].iov_len = bs; } - err = setup_ring(s); - if (err) { - printf("ring setup failed: %s, %d\n", strerror(errno), err); - return 1; + for (j = 0; j < nthreads; j++) { + s = get_submitter(j); + + err = setup_ring(s); + if (err) { + printf("ring setup failed: %s, %d\n", strerror(errno), err); + return 1; + } } + s = get_submitter(0); printf("polled=%d, fixedbufs=%d, register_files=%d, buffered=%d", polled, fixedbufs, register_files, buffered); printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", depth, *s->sq_ring.ring_entries, *s->cq_ring.ring_entries); - pthread_create(&s->thread, NULL, submitter_fn, s); + for (j = 0; j < nthreads; j++) { + s = get_submitter(j); + pthread_create(&s->thread, NULL, submitter_fn, s); + } - fdepths = malloc(8 * s->nr_files); + fdepths = malloc(8 * s->nr_files * nthreads); reap = calls = done = 0; do { unsigned long this_done = 0; @@ -652,9 +695,11 @@ int main(int argc, char *argv[]) unsigned long rpc = 0, ipc = 0; sleep(1); - this_done += s->done; - this_call += s->calls; - this_reap += s->reaps; + for (j = 0; j < nthreads; j++) { + this_done += s->done; + this_call += s->calls; + this_reap += s->reaps; + } if (this_call - calls) { rpc = (this_done - done) / (this_call - calls); ipc = (this_reap - reap) / (this_call - calls); @@ -669,8 +714,12 @@ int main(int argc, char *argv[]) reap = this_reap; } while (!finish); - pthread_join(s->thread, &ret); - close(s->ring_fd); + for (j = 0; j < nthreads; j++) { + s = get_submitter(j); + pthread_join(s->thread, &ret); + close(s->ring_fd); + } free(fdepths); + free(submitter); return 0; }