projects
/
fio.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
travis.yml: ensure we have libaio-dev and numa dev libs
[fio.git]
/
workqueue.c
diff --git
a/workqueue.c
b/workqueue.c
index 9fe7bec1a7cd6f874ee5185ebbdab9b78a6b8144..6e67f3e7df4dc8ac0910a2c96ef28d39d9e04bb2 100644
(file)
--- a/
workqueue.c
+++ b/
workqueue.c
@@
-1,5
+1,5
@@
/*
/*
- *
Rated submission helpers
+ *
Generic workqueue offload mechanism
*
* Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
*
*
* Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
*
@@
-9,7
+9,6
@@
#include "fio.h"
#include "flist.h"
#include "workqueue.h"
#include "fio.h"
#include "flist.h"
#include "workqueue.h"
-#include "lib/getrusage.h"
enum {
SW_F_IDLE = 1 << 0,
enum {
SW_F_IDLE = 1 << 0,
@@
-99,23
+98,20
@@
void workqueue_flush(struct workqueue *wq)
/*
* Must be serialized by caller. Returns true for queued, false for busy.
*/
/*
* Must be serialized by caller. Returns true for queued, false for busy.
*/
-
bool
workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work)
+
void
workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work)
{
struct submit_worker *sw;
sw = get_submit_worker(wq);
{
struct submit_worker *sw;
sw = get_submit_worker(wq);
- if (sw) {
- pthread_mutex_lock(&sw->lock);
- flist_add_tail(&work->list, &sw->work_list);
- sw->seq = ++wq->work_seq;
- sw->flags &= ~SW_F_IDLE;
- pthread_mutex_unlock(&sw->lock);
+ assert(sw);
- pthread_cond_signal(&sw->cond);
- return true;
- }
+ pthread_mutex_lock(&sw->lock);
+ flist_add_tail(&work->list, &sw->work_list);
+ sw->seq = ++wq->work_seq;
+ sw->flags &= ~SW_F_IDLE;
+ pthread_mutex_unlock(&sw->lock);
-
return false
;
+
pthread_cond_signal(&sw->cond)
;
}
static void handle_list(struct submit_worker *sw, struct flist_head *list)
}
static void handle_list(struct submit_worker *sw, struct flist_head *list)
@@
-134,10
+130,21
@@
static void *worker_thread(void *data)
{
struct submit_worker *sw = data;
struct workqueue *wq = sw->wq;
{
struct submit_worker *sw = data;
struct workqueue *wq = sw->wq;
- unsigned int eflags = 0, ret;
+ unsigned int eflags = 0, ret
= 0
;
FLIST_HEAD(local_list);
FLIST_HEAD(local_list);
- ret = workqueue_init_worker(sw);
+ sk_out_assign(sw->sk_out);
+
+ if (wq->ops.nice) {
+ if (nice(wq->ops.nice) < 0) {
+ log_err("workqueue: nice %s\n", strerror(errno));
+ ret = 1;
+ }
+ }
+
+ if (!ret)
+ ret = workqueue_init_worker(sw);
+
pthread_mutex_lock(&sw->lock);
sw->flags |= SW_F_RUNNING;
if (ret)
pthread_mutex_lock(&sw->lock);
sw->flags |= SW_F_RUNNING;
if (ret)
@@
-201,6
+208,7
@@
done:
pthread_mutex_lock(&sw->lock);
sw->flags |= (SW_F_EXITED | eflags);
pthread_mutex_unlock(&sw->lock);
pthread_mutex_lock(&sw->lock);
sw->flags |= (SW_F_EXITED | eflags);
pthread_mutex_unlock(&sw->lock);
+ sk_out_drop();
return NULL;
}
return NULL;
}
@@
-229,6
+237,9
@@
void workqueue_exit(struct workqueue *wq)
struct submit_worker *sw;
int i;
struct submit_worker *sw;
int i;
+ if (!wq->workers)
+ return;
+
for (i = 0; i < wq->max_workers; i++) {
sw = &wq->workers[i];
for (i = 0; i < wq->max_workers; i++) {
sw = &wq->workers[i];
@@
-253,12
+264,14
@@
void workqueue_exit(struct workqueue *wq)
} while (shutdown && shutdown != wq->max_workers);
free(wq->workers);
} while (shutdown && shutdown != wq->max_workers);
free(wq->workers);
+ wq->workers = NULL;
pthread_mutex_destroy(&wq->flush_lock);
pthread_cond_destroy(&wq->flush_cond);
pthread_mutex_destroy(&wq->stat_lock);
}
pthread_mutex_destroy(&wq->flush_lock);
pthread_cond_destroy(&wq->flush_cond);
pthread_mutex_destroy(&wq->stat_lock);
}
-static int start_worker(struct workqueue *wq, unsigned int index)
+static int start_worker(struct workqueue *wq, unsigned int index,
+ struct sk_out *sk_out)
{
struct submit_worker *sw = &wq->workers[index];
int ret;
{
struct submit_worker *sw = &wq->workers[index];
int ret;
@@
-268,6
+281,7
@@
static int start_worker(struct workqueue *wq, unsigned int index)
pthread_mutex_init(&sw->lock, NULL);
sw->wq = wq;
sw->index = index;
pthread_mutex_init(&sw->lock, NULL);
sw->wq = wq;
sw->index = index;
+ sw->sk_out = sk_out;
if (wq->ops.alloc_worker_fn) {
ret = wq->ops.alloc_worker_fn(sw);
if (wq->ops.alloc_worker_fn) {
ret = wq->ops.alloc_worker_fn(sw);
@@
-288,12
+302,13
@@
static int start_worker(struct workqueue *wq, unsigned int index)
}
int workqueue_init(struct thread_data *td, struct workqueue *wq,
}
int workqueue_init(struct thread_data *td, struct workqueue *wq,
- struct workqueue_ops *ops, unsigned max_pending)
+ struct workqueue_ops *ops, unsigned int max_workers,
+ struct sk_out *sk_out)
{
unsigned int running;
int i, error;
{
unsigned int running;
int i, error;
- wq->max_workers = max_
pending
;
+ wq->max_workers = max_
workers
;
wq->td = td;
wq->ops = *ops;
wq->work_seq = 0;
wq->td = td;
wq->ops = *ops;
wq->work_seq = 0;
@@
-305,7
+320,7
@@
int workqueue_init(struct thread_data *td, struct workqueue *wq,
wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
for (i = 0; i < wq->max_workers; i++)
wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
for (i = 0; i < wq->max_workers; i++)
- if (start_worker(wq, i))
+ if (start_worker(wq, i
, sk_out
))
break;
wq->max_workers = i;
break;
wq->max_workers = i;