If you do:
t/io_uring -n2 /dev/dev1 /dev/dev2
then t/io_uring will create two IO threads, and each one will get
a file/device assigned. In the above example, thread 1 will run
on dev1, thread 2 on dev2.
Note that for now, you'll need at least as many files as threads.
Adding support for adding the same file set over the specified
threads (if we have less files than threads) is left as an
exercise for the reader. You know where to send the patches.
Signed-off-by: Jens Axboe <axboe@kernel.dk>
struct submitter {
pthread_t thread;
int ring_fd;
struct submitter {
pthread_t thread;
int ring_fd;
struct io_sq_ring sq_ring;
struct io_uring_sqe *sqes;
struct io_cq_ring cq_ring;
struct io_sq_ring sq_ring;
struct io_uring_sqe *sqes;
struct io_cq_ring cq_ring;
static int sq_thread_poll = 0; /* use kernel submission/poller thread */
static int sq_thread_cpu = -1; /* pin above thread to this CPU */
static int do_nop = 0; /* no-op SQ ring commands */
static int sq_thread_poll = 0; /* use kernel submission/poller thread */
static int sq_thread_cpu = -1; /* pin above thread to this CPU */
static int do_nop = 0; /* no-op SQ ring commands */
+static int nthreads = 1;
+static struct submitter *get_submitter(int offset)
+{
+ void *ret;
+
+ ret = submitter;
+ if (offset)
+ ret += offset * (sizeof(*submitter) + depth * sizeof(struct iovec));
+ return ret;
+}
+
static void sig_int(int sig)
{
static void sig_int(int sig)
{
printf("Exiting on signal %d\n", sig);
printf("Exiting on signal %d\n", sig);
+ for (j = 0; j < nthreads; j++) {
+ struct submitter *s = get_submitter(j);
+ s->finish = 1;
+ }
static void file_depths(char *buf)
{
static void file_depths(char *buf)
{
- struct submitter *s = submitter;
- for (i = 0; i < s->nr_files; i++) {
- struct file *f = &s->files[i];
+ for (j = 0; j < nthreads; j++) {
+ struct submitter *s = get_submitter(j);
- if (i + 1 == s->nr_files)
- p += sprintf(p, "%d", f->pending_ios);
- else
- p += sprintf(p, "%d, ", f->pending_ios);
+ for (i = 0; i < s->nr_files; i++) {
+ struct file *f = &s->files[i];
+
+ if (i + 1 == s->nr_files)
+ p += sprintf(p, "%d", f->pending_ios);
+ else
+ p += sprintf(p, "%d, ", f->pending_ios);
+ }
{
struct submitter *s;
unsigned long done, calls, reap;
{
struct submitter *s;
unsigned long done, calls, reap;
- int err, i, flags, fd, opt;
+ int err, i, j, flags, fd, opt;
char *fdepths;
void *ret;
char *fdepths;
void *ret;
- while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:h?")) != -1) {
+ while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:n:h?")) != -1) {
switch (opt) {
case 'd':
depth = atoi(optarg);
switch (opt) {
case 'd':
depth = atoi(optarg);
case 'F':
register_files = !!atoi(optarg);
break;
case 'F':
register_files = !!atoi(optarg);
break;
+ case 'n':
+ nthreads = atoi(optarg);
+ break;
case 'h':
case '?':
default:
case 'h':
case '?':
default:
- submitter = malloc(sizeof(*submitter) + depth * sizeof(struct iovec));
- memset(submitter, 0, sizeof(*submitter) + depth * sizeof(struct iovec));
- s = submitter;
+ submitter = calloc(nthreads, sizeof(*submitter) +
+ depth * sizeof(struct iovec));
+ for (j = 0; j < nthreads; j++) {
+ s = get_submitter(j);
+ s->index = j;
+ s->done = s->calls = s->reaps = 0;
+ }
flags = O_RDONLY | O_NOATIME;
if (!buffered)
flags |= O_DIRECT;
flags = O_RDONLY | O_NOATIME;
if (!buffered)
flags |= O_DIRECT;
+ printf("i %d, argc %d\n", i, argc);
while (!do_nop && i < argc) {
struct file *f;
while (!do_nop && i < argc) {
struct file *f;
if (s->nr_files == MAX_FDS) {
printf("Max number of files (%d) reached\n", MAX_FDS);
break;
if (s->nr_files == MAX_FDS) {
printf("Max number of files (%d) reached\n", MAX_FDS);
break;
- printf("Added file %s\n", argv[i]);
+ printf("Added file %s (submitter %d)\n", argv[i], s->index);
+ if (++j >= nthreads)
+ j = 0;
- for (i = 0; i < depth; i++) {
- void *buf;
+ for (j = 0; j < nthreads; j++) {
+ s = get_submitter(j);
+ for (i = 0; i < depth; i++) {
+ void *buf;
- if (posix_memalign(&buf, bs, bs)) {
- printf("failed alloc\n");
- return 1;
+ if (posix_memalign(&buf, bs, bs)) {
+ printf("failed alloc\n");
+ return 1;
+ }
+ s->iovecs[i].iov_base = buf;
+ s->iovecs[i].iov_len = bs;
- s->iovecs[i].iov_base = buf;
- s->iovecs[i].iov_len = bs;
- err = setup_ring(s);
- if (err) {
- printf("ring setup failed: %s, %d\n", strerror(errno), err);
- return 1;
+ for (j = 0; j < nthreads; j++) {
+ s = get_submitter(j);
+
+ err = setup_ring(s);
+ if (err) {
+ printf("ring setup failed: %s, %d\n", strerror(errno), err);
+ return 1;
+ }
printf("polled=%d, fixedbufs=%d, register_files=%d, buffered=%d", polled, fixedbufs, register_files, buffered);
printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", depth, *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
printf("polled=%d, fixedbufs=%d, register_files=%d, buffered=%d", polled, fixedbufs, register_files, buffered);
printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", depth, *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
- pthread_create(&s->thread, NULL, submitter_fn, s);
+ for (j = 0; j < nthreads; j++) {
+ s = get_submitter(j);
+ pthread_create(&s->thread, NULL, submitter_fn, s);
+ }
- fdepths = malloc(8 * s->nr_files);
+ fdepths = malloc(8 * s->nr_files * nthreads);
reap = calls = done = 0;
do {
unsigned long this_done = 0;
reap = calls = done = 0;
do {
unsigned long this_done = 0;
unsigned long rpc = 0, ipc = 0;
sleep(1);
unsigned long rpc = 0, ipc = 0;
sleep(1);
- this_done += s->done;
- this_call += s->calls;
- this_reap += s->reaps;
+ for (j = 0; j < nthreads; j++) {
+ this_done += s->done;
+ this_call += s->calls;
+ this_reap += s->reaps;
+ }
if (this_call - calls) {
rpc = (this_done - done) / (this_call - calls);
ipc = (this_reap - reap) / (this_call - calls);
if (this_call - calls) {
rpc = (this_done - done) / (this_call - calls);
ipc = (this_reap - reap) / (this_call - calls);
reap = this_reap;
} while (!finish);
reap = this_reap;
} while (!finish);
- pthread_join(s->thread, &ret);
- close(s->ring_fd);
+ for (j = 0; j < nthreads; j++) {
+ s = get_submitter(j);
+ pthread_join(s->thread, &ret);
+ close(s->ring_fd);
+ }