+ s->inflight -= reaped;
+ s->done += reaped;
+ return reaped;
+}
+
+static void *submitter_aio_fn(void *data)
+{
+ struct submitter *s = data;
+ int i, ret, prepped;
+ struct iocb **iocbsptr;
+ struct iocb *iocbs;
+ struct io_event *events;
+#ifdef ARCH_HAVE_CPU_CLOCK
+ int nr_batch = submitter_init(s);
+#else
+ submitter_init(s);
+#endif
+
+ iocbsptr = calloc(depth, sizeof(struct iocb *));
+ iocbs = calloc(depth, sizeof(struct iocb));
+ events = calloc(depth, sizeof(struct io_event));
+
+ for (i = 0; i < depth; i++)
+ iocbsptr[i] = &iocbs[i];
+
+ prepped = 0;
+ do {
+ int to_wait, to_submit, to_prep;
+
+ if (!prepped && s->inflight < depth) {
+ to_prep = min(depth - s->inflight, batch_submit);
+ prepped = prep_more_ios_aio(s, to_prep, iocbs);
+#ifdef ARCH_HAVE_CPU_CLOCK
+ if (prepped && stats) {
+ s->clock_batch[s->clock_index] = get_cpu_clock();
+ s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
+ }
+#endif
+ }
+ s->inflight += prepped;
+ to_submit = prepped;
+
+ if (to_submit && (s->inflight + to_submit <= depth))
+ to_wait = 0;
+ else
+ to_wait = min(s->inflight + to_submit, batch_complete);
+
+ ret = io_submit(s->aio_ctx, to_submit, iocbsptr);
+ s->calls++;
+ if (ret < 0) {
+ perror("io_submit");
+ break;
+ } else if (ret != to_submit) {
+ printf("submitted %d, wanted %d\n", ret, to_submit);
+ break;
+ }
+ prepped = 0;
+
+ while (to_wait) {
+ int r;
+
+ s->calls++;
+ r = io_getevents(s->aio_ctx, to_wait, to_wait, events, NULL);
+ if (r < 0) {
+ perror("io_getevents");
+ break;
+ } else if (r != to_wait) {
+ printf("r=%d, wait=%d\n", r, to_wait);
+ break;
+ }
+ r = reap_events_aio(s, events, r);
+ s->reaps += r;
+ to_wait -= r;
+ }
+ } while (!s->finish);
+
+ free(iocbsptr);
+ free(iocbs);
+ free(events);
+ finish = 1;
+ return NULL;
+}
+#endif
+
+static void io_uring_unregister_ring(struct submitter *s)
+{
+ struct io_uring_rsrc_update up = {
+ .offset = s->enter_ring_fd,
+ };
+
+ syscall(__NR_io_uring_register, s->ring_fd, IORING_UNREGISTER_RING_FDS,
+ &up, 1);
+}
+
+static int io_uring_register_ring(struct submitter *s)
+{
+ struct io_uring_rsrc_update up = {
+ .data = s->ring_fd,
+ .offset = -1U,
+ };
+ int ret;
+
+ ret = syscall(__NR_io_uring_register, s->ring_fd,
+ IORING_REGISTER_RING_FDS, &up, 1);
+ if (ret == 1) {
+ s->enter_ring_fd = up.offset;
+ return 0;
+ }
+ register_ring = 0;
+ return -1;
+}
+
+static void *submitter_uring_fn(void *data)
+{
+ struct submitter *s = data;
+ struct io_sq_ring *ring = &s->sq_ring;
+ int ret, prepped;
+#ifdef ARCH_HAVE_CPU_CLOCK
+ int nr_batch = submitter_init(s);
+#else
+ submitter_init(s);
+#endif
+
+ if (register_ring)
+ io_uring_register_ring(s);
+
+ prepped = 0;
+ do {
+ int to_wait, to_submit, this_reap, to_prep;
+ unsigned ring_flags = 0;
+
+ if (!prepped && s->inflight < depth) {
+ to_prep = min(depth - s->inflight, batch_submit);
+ prepped = prep_more_ios_uring(s, to_prep);
+#ifdef ARCH_HAVE_CPU_CLOCK
+ if (prepped && stats) {
+ s->clock_batch[s->clock_index] = get_cpu_clock();
+ s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
+ }
+#endif
+ }
+ s->inflight += prepped;
+submit_more:
+ to_submit = prepped;
+submit:
+ if (to_submit && (s->inflight + to_submit <= depth))
+ to_wait = 0;
+ else
+ to_wait = min(s->inflight + to_submit, batch_complete);
+
+ /*
+ * Only need to call io_uring_enter if we're not using SQ thread
+ * poll, or if IORING_SQ_NEED_WAKEUP is set.
+ */
+ if (sq_thread_poll)
+ ring_flags = atomic_load_acquire(ring->flags);
+ if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
+ unsigned flags = 0;
+
+ if (to_wait)
+ flags = IORING_ENTER_GETEVENTS;
+ if (ring_flags & IORING_SQ_NEED_WAKEUP)
+ flags |= IORING_ENTER_SQ_WAKEUP;
+ ret = io_uring_enter(s, to_submit, to_wait, flags);
+ s->calls++;
+ } else {
+ /* for SQPOLL, we submitted it all effectively */
+ ret = to_submit;
+ }
+
+ /*
+ * For non SQ thread poll, we already got the events we needed
+ * through the io_uring_enter() above. For SQ thread poll, we
+ * need to loop here until we find enough events.
+ */
+ this_reap = 0;
+ do {
+ int r;
+
+ if (pt)
+ r = reap_events_uring_pt(s);