|
目录
一、核心数据结构
1. struct work_struct
2. struct cpu_workqueue_struct
3. struct workqueue_struct
4. 这三个数据结构之间的关系
5. 工作队列实现框架(singlethread)
二、创建并初始化工作队列
三、工作者线程
四、调度一个任务到工作队列中
五、销毁工作队列
六、Linux内核维护的工作队列
双向链表操作函数
struct work_struct { unsigned long pending; struct list_head entry; /*将工作节点构成链表*/ void (*func)(void *); /*延时处理函数*/ void *data; void *wq_data; struct timer_list timer; }; |
struct cpu_workqueue_struct {
spinlock_t lock;
long remove_sequence; /* Least-recently added (next to run) */ long insert_sequence; /* Next to add */
struct list_head worklist; wait_queue_head_t more_work; wait_queue_head_t work_done;
struct workqueue_struct *wq; task_t *thread;
int run_depth; /* Detect run_workqueue() recursion depth */ } ____cacheline_aligned; |
struct workqueue_struct { struct cpu_workqueue_struct cpu_wq[NR_CPUS]; const char *name; struct list_head list; /* Empty if single thread */ };
|
#define create_workqueue(name) __create_workqueue((name), 0) #define create_singlethread_workqueue(name) __create_workqueue((name), 1) |
struct workqueue_struct *__create_workqueue(const char *name, int singlethread) { int cpu, destroy = 0; struct workqueue_struct *wq; struct task_struct *p;
BUG_ON(strlen(name) > 10);
/*分配一个workqueue结构,其中包括ncore个cpu_workqueue_struct*/ wq = kmalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return NULL; memset(wq, 0, sizeof(*wq));/*初始化workqueue结构*/
wq->name = name; /* We don't need the distraction of CPUs appearing and vanishing. */ lock_cpu_hotplug(); if (singlethread) {/*如果为singlethread模式,则只需要创建一个workqueue*/ INIT_LIST_HEAD(&wq->list);/*自成一派,无需加入全局workqueue链表*/ p = create_workqueue_thread(wq, 0);/*只创建一个workqueue线程*/ if (!p) destroy = 1; else wake_up_process(p);/*启动workqueue线程*/ } else { spin_lock(&workqueue_lock); list_add(&wq->list, &workqueues);/*加入全局的workqueue链表中*/ spin_unlock(&workqueue_lock); for_each_online_cpu(cpu) { p = create_workqueue_thread(wq, cpu);/*为每一个CPU都创建一个workqueue线程*/ if (p) { kthread_bind(p, cpu);/*为每一个线程绑定特定cpu*/ wake_up_process(p);/*启动线程*/ } else destroy = 1; } } unlock_cpu_hotplug();
/* * Was there any error during startup? If yes then clean up: */ if (destroy) {/*出现错误,则销毁workqueue*/ destroy_workqueue(wq); wq = NULL; } return wq; } |
工作者线程相关参见下一小节。
在创建工作队列时,需要创建对应的工作者线程。那么为什么要创建工作者线程呢?原因是这样的:工作队列主要的目的是:为了简化在内核中创建线程而设计的。此外为了实现延迟任务,需要通过异步手段将此任务交由其他进程处理,而当前进程可以继续处理其他事物。通过工作队列相关的基础设施,人们不必再关心内核中如何创建、维护、销毁内核线程,如何调度任务等,而只需要关心与特定功能相关的事务。下面简单介绍下工作者线程:
/*工作队列管理结构初始化,并创建对应的线程*/ static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, int cpu) { struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu; struct task_struct *p;
spin_lock_init(&cwq->lock); cwq->wq = wq; cwq->thread = NULL; cwq->insert_sequence = 0; cwq->remove_sequence = 0; INIT_LIST_HEAD(&cwq->worklist);/*清空工作列表*/ init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->work_done);
/*创建工作者线程并初始化为相应的name: name/cpu *工作者线程的执行体为:worker_thread */ if (is_single_threaded(wq)) p = kthread_create(worker_thread, cwq, "%s", wq->name); else p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); if (IS_ERR(p)) return NULL; cwq->thread = p;/*工作队列绑定线程*/ return p; } |
static int worker_thread(void *__cwq) { struct cpu_workqueue_struct *cwq = __cwq; DECLARE_WAITQUEUE(wait, current); struct k_sigaction sa; sigset_t blocked;
current->flags |= PF_NOFREEZE;
set_user_nice(current, -5);
/* Block and flush all signals */ sigfillset(&blocked); sigprocmask(SIG_BLOCK, &blocked, NULL); flush_signals(current);
/* SIG_IGN makes children autoreap: see do_notify_parent(). */ sa.sa.sa_handler = SIG_IGN; sa.sa.sa_flags = 0; siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
set_current_state(TASK_INTERRUPTIBLE); while (!kthread_should_stop()) {/*没有其他线程调用kthread_stop关闭此线程*/ add_wait_queue(&cwq->more_work, &wait); if (list_empty(&cwq->worklist))/*如果任务列表中没有工作任务,则让出CPU*/ schedule(); else __set_current_state(TASK_RUNNING); remove_wait_queue(&cwq->more_work, &wait);
if (!list_empty(&cwq->worklist))/*任务列表中有工作任务,则执行任务*/ run_workqueue(cwq); set_current_state(TASK_INTERRUPTIBLE); } __set_current_state(TASK_RUNNING); return 0; } |
/*工作者线程执行工作链表上的延时任务*/ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) { unsigned long flags;
/* * Keep taking off work from the queue until * done. */ spin_lock_irqsave(&cwq->lock, flags); cwq->run_depth++; if (cwq->run_depth > 3) { /* morton gets to eat his hat */ printk("%s: recursion depth exceeded: %d\n", __FUNCTION__, cwq->run_depth); dump_stack(); } while (!list_empty(&cwq->worklist)) { struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); void (*f) (void *) = work->func;/*取出任务操作以及参数*/ void *data = work->data; list_del_init(cwq->worklist.next);/*从worklist中删除任务*/
spin_unlock_irqrestore(&cwq->lock, flags);
BUG_ON(work->wq_data != cwq); clear_bit(0, &work->pending);/*标识此任务已经被调度执行*/ f(data);/*执行此任务*/
spin_lock_irqsave(&cwq->lock, flags); cwq->remove_sequence++; wake_up(&cwq->work_done); } cwq->run_depth--; spin_unlock_irqrestore(&cwq->lock, flags); } |
将任务work_struct添加到工作队列中相对比较简单:只需要将work_struct结构通过struct_listhead链到workqueue的worklist上即可。代码实现详见下表:
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) { int ret = 0, cpu = get_cpu(); /*如果work->pending第0位为1,则说明当前任务已经被提交,但尚未执行 * 如果为work->pending第0位为0,表示该任务尚未提交,可以进行提交 */ if (!test_and_set_bit(0, &work->pending)) {/*返回pending的第0位,并将其置1*/ if (unlikely(is_single_threaded(wq))) cpu = 0; BUG_ON(!list_empty(&work->entry)); __queue_work(wq->cpu_wq + cpu, work);/*调度到特定CPU的worklist上*/ ret = 1; } put_cpu(); return ret; } |
/* Preempt must be disabled. */ static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) { unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags); work->wq_data = cwq; list_add_tail(&work->entry, &cwq->worklist);/*将任务添加到工作列表中*/ cwq->insert_sequence++; wake_up(&cwq->more_work);/*唤醒可能正在睡眠的工作者线程*/ spin_unlock_irqrestore(&cwq->lock, flags); } |
在销毁工作队列时,如果工作链表worklist中仍然有等待执行的任务,可以有两种操作:①全部丢弃;②全部执行完毕后再销毁工作队列。Linux内核中不同版本处理方式不一致,参考Linux-2.6.12源码,是讲工作链表中的任务全部执行完毕后再销毁工作队列。
void destroy_workqueue(struct workqueue_struct *wq) { int cpu;
flush_workqueue(wq);/*确保提交到worklist的任务都全部执行完成*/
/* We don't need the distraction of CPUs appearing and vanishing. */ lock_cpu_hotplug(); if (is_single_threaded(wq)) cleanup_workqueue_thread(wq, 0);/*清除工作者线程*/ else { for_each_online_cpu(cpu) cleanup_workqueue_thread(wq, cpu);/*清除工作者线程*/ spin_lock(&workqueue_lock); list_del(&wq->list);/*从全局工作队列链表上摘除*/ spin_unlock(&workqueue_lock); } unlock_cpu_hotplug(); kfree(wq); } |
static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) { struct cpu_workqueue_struct *cwq; unsigned long flags; struct task_struct *p;
cwq = wq->cpu_wq + cpu; spin_lock_irqsave(&cwq->lock, flags); p = cwq->thread; cwq->thread = NULL; spin_unlock_irqrestore(&cwq->lock, flags); if (p) kthread_stop(p);/*停止工作者线程,此函数会阻塞*/ } |
内核在启动过程中初始化了一个(可能有多个)工作队列keventd_wq(name为events.)
这个工作队列我们都可以使用,如果不想自己创建单独的工作队列,我们完全可以使用events队列来完成我们的任务。需要说明的是:events可能有很多任务需要处理,因此效率上可能不是很高,因此需要视具体情况而定。events调用接口在上面提供的接口基础上又重新做了封装。
/*内核启动过程中定义的工作队列:kevenetd_wq*/ static struct workqueue_struct *keventd_wq; |
void init_workqueues(void) { hotcpu_notifier(workqueue_cpu_callback, 0); keventd_wq = create_workqueue("events");/*创建keventd_wq*/ BUG_ON(!keventd_wq); } |
int fastcall schedule_work(struct work_struct *work) { return queue_work(keventd_wq, work); } |
int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay) { return queue_delayed_work(keventd_wq, work, delay); } |