workqueue常用的queue操作接口如下所示:
/**
* schedule_work_on - put work task on a specific cpu
* @cpu: cpu to put the work task on
* @work: job to be done
*
* This puts a job on a specific cpu
*/
static inline bool schedule_work_on(int cpu, struct work_struct *work)
{
return queue_work_on(cpu, system_wq, work);
}
这个函数会把一个work任务放到一个特定的CPU上去执行,并且是使用系统的system_wq。
/**
* schedule_work - put work task in global workqueue
* @work: job to be done
*
* Returns %false if @work was already on the kernel-global workqueue and
* %true otherwise.
*
* This puts a job in the kernel-global workqueue if it was not already
* queued and leaves it in the same position on the kernel-global
* workqueue otherwise.
*/
static inline bool schedule_work(struct work_struct *work)
{
return queue_work(system_wq, work);
}
把一个work任务放到一个全局workquque中去执行,queue_work实际上会把任务放到本CPU上的worker线程去处理。参加下面的实现:
/**
* queue_work - queue work on a workqueue
* @wq: workqueue to use
* @work: work to queue
*
* Returns %false if @work was already on a queue, %true otherwise.
*
* We queue the work to the CPU on which it was submitted, but if the CPU dies
* it can be processed by another CPU.
*/
static inline bool queue_work(struct workqueue_struct *wq,
struct work_struct *work)
{
return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}
这些接口最终都是通过queue_work_on执行下去的。
bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
bool ret = false;
unsigned long flags;
local_irq_save(flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_work(cpu, wq, work);
ret = true;
}
local_irq_restore(flags);
return ret;
}
EXPORT_SYMBOL(queue_work_on);
这里会先判断该work的pending位,来确定该work是否正在一个worker线程上等待执行,如果是的话,说明它还在排队,并没有开始执行,那么就不重复加入了。否则我们就把该work的pending位置位并执行__queue_work。
static inline int test_and_set_bit(int nr, volatile unsigned long *addr)
{
unsigned long mask = BIT_MASK(nr);
unsigned long *p = ((unsigned long *)addr) + BIT_WORD(nr);
unsigned long old;
unsigned long flags;
_atomic_spin_lock_irqsave(p, flags);
old = *p;
*p = old | mask;
_atomic_spin_unlock_irqrestore(p, flags);
return (old & mask) != 0;
}
该函数会设置work结构体中的data成员的WORK_STRUCT_PENDING_BIT成员,并且返回它的旧值,这是一个由spinlock保护的临界区,所以当我们queue_work操作时,一个work只会被queue一次,因为一旦检测到该work处于pending状态,那么就不会重复queue了,这个逻辑防止了work的重入。下面进入分析的重头戏:
static void __queue_work(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct pool_workqueue *pwq;
struct worker_pool *last_pool;
struct list_head *worklist;
unsigned int work_flags;
unsigned int req_cpu = cpu;
/*
* While a work item is PENDING && off queue, a task trying to
* steal the PENDING will busy-loop waiting for it to either get
* queued or lose PENDING. Grabbing PENDING and queueing should
* happen with IRQ disabled.
*/
WARN_ON_ONCE(!irqs_disabled());
debug_work_activate(work);
/* if draining, only works from the same workqueue are allowed */
if (unlikely(wq->flags & __WQ_DRAINING) &&
WARN_ON_ONCE(!is_chained_work(wq)))
return;
retry:
if (req_cpu == WORK_CPU_UNBOUND)
cpu = wq_select_unbound_cpu(raw_smp_processor_id());
/* pwq which will be used unless @work is executing elsewhere */
if (!(wq->flags & WQ_UNBOUND))
pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
else
pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
/*
* If @work was previously on a different pool, it might still be
* running there, in which case the work needs to be queued on that
* pool to guarantee non-reentrancy.
*/
last_pool = get_work_pool(work);
if (last_pool && last_pool != pwq->pool) {
struct worker *worker;
spin_lock(&last_pool->lock);
worker = find_worker_executing_work(last_pool, work);
if (worker && worker->current_pwq->wq == wq) {
pwq = worker->current_pwq;
} else {
/* meh... not running there, queue here */
spin_unlock(&last_pool->lock);
spin_lock(&pwq->pool->lock);
}
} else {
spin_lock(&pwq->pool->lock);
}
/*
* pwq is determined and locked. For unbound pools, we could have
* raced with pwq release and it could already be dead. If its
* refcnt is zero, repeat pwq selection. Note that pwqs never die
* without another pwq replacing it in the numa_pwq_tbl or while
* work items are executing on it, so the retrying is guaranteed to
* make forward-progress.
*/
if (unlikely(!pwq->refcnt)) {
if (wq->flags & WQ_UNBOUND) {
spin_unlock(&pwq->pool->lock);
cpu_relax();
goto retry;
}
/* oops */
WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
wq->name, cpu);
}
/* pwq determined, queue */
trace_workqueue_queue_work(req_cpu, pwq, work);
if (WARN_ON(!list_empty(&work->entry))) {
spin_unlock(&pwq->pool->lock);
return;
}
pwq->nr_in_flight[pwq->work_color]++;
work_flags = work_color_to_flags(pwq->work_color);
if (likely(pwq->nr_active < pwq->max_active)) {
trace_workqueue_activate_work(work);
pwq->nr_active++;
worklist = &pwq->pool->worklist;
if (list_empty(worklist))
pwq->pool->watchdog_ts = jiffies;
} else {
work_flags |= WORK_STRUCT_DELAYED;
worklist = &pwq->delayed_works;
}
insert_work(pwq, work, worklist, work_flags);
spin_unlock(&pwq->pool->lock);
}
__queue_work的主要目的是找到一个合适的worker_pool,并把insert_work到该worker_pool,每个worker_pool对应一个pool_workqueue结构体,这个结构体记录了worker_pool和work之间的绑定关系,所以该函数的关键是先找到pwq。该函数我把它分为4部分来分析:
第一部分是默认查找CPU,它又分为2种情况:
(1)接口传入的CPU是某一个CPU ID
(2)接口传入的CPU是WORK_CPU_UNBOUND,那么选择本CPU ID
第二部分,根据上一步选择CPU确定pwq:
(1)如果workqueue的属性percpu的(不包含WQ_UNBOUND),说明该workqueue中的work执行是在percpu worker_pool上,并且确定是上一步确定的CPU worker_pool上。
(2)如果workqueue的属性包含WQ_UNBOUND,说明该workqueue中的work执行在global worker_pool上。
第三部分,查看该work是否正在执行:
(1)如果该work正在某个worker_pool中执行,那么选定这个worker_pool为最终的目标
(2)如果该work没有在某个worker_pool中执行,那么依然使用上一步的选择。
第四部分,把work通过insert_work和worker_pool进行绑定。
到这里可能还会有一个疑问,就是上面我们提到,当加入一个work时会判断pending位来防止重入,那么既然能够执行进来说明work已经不在队列排队了,那么为什么最后还判断它是否在一个worker上运行呢?实际上这里最关键的是要区分pending和running这两个状态,一个work的pending和running是两种状态,work在被执行前会把pending位清除掉。假设一种情况,一个work恰好在被worker执行前清除了pending位,并且处于执行状态时,另一个线程使用queue来对work进行操作,就会遇到上述第三部分处理的情况了。
最后在总结一下percpu的workqueue(不包含WQ_UNBOUND):
1.percpu不代表其中所有的work都在同一个cpu上执行
2.work的执行是绑定CPU的,由传入的CPU ID参数指定
3.通过传入的参数WORK_CPU_UNBOUND,work依然可以变为非绑定状态