热门关键字:  ubuntu  分区  函数  linux系统进程  Fedora

当前位置 :| 主页>Linux教程>内核研究>

Linux操作系统内核中工作队列的操作

来源: 作者: 时间:2007-05-09 Tag: 点击:

本文档的Copyleft归yfydz所有,使用GPL发布,可以自由拷贝,转载,转载时请保持文档的完整性,严禁用于任何商业用途。

msn: yfydz_no1@hotmail.com

来源:http://yfydz.cublog.cn

1. 前言

工作队列(workqueue)的Linux内核中的定义的用来处理不是很紧急事件的回调方式处理方法.

以下代码的linux内核版本为2.6.19.2, 源代码文件主要为kernel/workqueue.c.

2. 数据结构

/* include/linux/workqueue.h */

// 工作节点结构

struct work_struct {

// 等待时间

unsigned long pending;

// 链表节点

struct list_head entry;

// workqueue回调函数

void (*func)(void *);

// 回调函数func的数据

void *data;

// 指向CPU相关数据, 一般指向struct cpu_workqueue_struct结构

void *wq_data;

// 定时器

struct timer_list timer;

};

struct execute_work {

struct work_struct work;

};

/* kernel/workqueue.c */

/*

* The per-CPU workqueue (if single thread, we always use the first

* possible cpu).

*

* The sequence counters are for flush_scheduled_work(). It wants to wait

* until all currently-scheduled works are completed, but it doesn't

* want to be livelocked by new, incoming ones. So it waits until

* remove_sequence is >= the insert_sequence which pertained when

* flush_scheduled_work() was called.

*/

// 这个结构是针对每个CPU的

struct cpu_workqueue_struct {

// 结构锁

spinlock_t lock;

// 下一个要执行的节点序号

long remove_sequence; /* Least-recently added (next to run) */

// 下一个要插入节点的序号

long insert_sequence; /* Next to add */

// 工作机构链表节点

struct list_head worklist;

// 要进行处理的等待队列

wait_queue_head_t more_work;

// 处理完的等待队列

wait_queue_head_t work_done;

// 工作队列节点

struct workqueue_struct *wq;

// 进程指针

struct task_struct *thread;

int run_depth; /* Detect run_workqueue() recursion depth */

} ____cacheline_aligned;

/*

* The externally visible workqueue abstraction is an array of

* per-CPU workqueues:

*/

// 工作队列结构

struct workqueue_struct {

struct cpu_workqueue_struct *cpu_wq;

const char *name;

struct list_head list; /* Empty if single thread */

};

kernel/workqueue.c中定义了一个工作队列链表, 所有工作队列可以挂接到这个链表中:

static LIST_HEAD(workqueues);

3. 一些宏定义

/* include/linux/workqueue.h */

// 初始化工作队列

#define __WORK_INITIALIZER(n, f, d) { \

// 初始化list

.entry = { &(n).entry, &(n).entry }, \

// 回调函数

.func = (f), \

// 回调函数参数

.data = (d), \

// 初始化定时器

.timer = TIMER_INITIALIZER(NULL, 0, 0), \

}

// 声明工作队列并初始化

#define DECLARE_WORK(n, f, d) \

struct work_struct n = __WORK_INITIALIZER(n, f, d)

/*

* initialize a work-struct's func and data pointers:

*/

// 重新定义工作结构参数

#define PREPARE_WORK(_work, _func, _data) \

do { \

(_work)->func = _func; \

(_work)->data = _data; \

} while (0)

/*

* initialize all of a work-struct:

*/

// 初始化工作结构, 和__WORK_INITIALIZER功能相同,不过__WORK_INITIALIZER用在

// 参数初始化定义, 而该宏用在程序之中对工作结构赋值

#define INIT_WORK(_work, _func, _data) \

do { \

INIT_LIST_HEAD(&(_work)->entry); \

(_work)->pending = 0; \

PREPARE_WORK((_work), (_func), (_data)); \

init_timer(&(_work)->timer); \

} while (0)

4. 操作函数

4.1 创建工作队列

一般的创建函数是create_workqueue, 但这其实只是一个宏:

/* include/linux/workqueue.h */

#define create_workqueue(name) __create_workqueue((name), 0)

在workqueue的初始化函数中, 定义了一个针对内核中所有线程可用的事件工作队列, 其他内核线程建立的事件工作结构就都挂接到该队列:

void init_workqueues(void)

{

...

keventd_wq = create_workqueue("events");

...

}

核心创建函数是__create_workqueue:

struct workqueue_struct *__create_workqueue(const char *name,

int singlethread)

{

int cpu, destroy = 0;

struct workqueue_struct *wq;

struct task_struct *p;

// 分配工作队列结构空间

wq = kzalloc(sizeof(*wq), GFP_KERNEL);

if (!wq)

return NULL;

// 为每个CPU分配单独的工作队列空间

wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);

if (!wq->cpu_wq) {

kfree(wq);

return NULL;

}

wq->name = name;

mutex_lock(&workqueue_mutex);

if (singlethread) {

// 使用create_workqueue宏时该参数始终为0

// 如果是单一线程模式, 在单线程中调用各个工作队列

// 建立一个的工作队列内核线程

INIT_LIST_HEAD(&wq->list);

// 建立工作队列的线程

p = create_workqueue_thread(wq, singlethread_cpu);

if (!p)

destroy = 1;

else

// 唤醒该线程

wake_up_process(p);

} else {

// 链表模式, 将工作队列添加到工作队列链表

list_add(&wq->list, &workqueues);

// 为每个CPU建立一个工作队列线程

for_each_online_cpu(cpu) {

p = create_workqueue_thread(wq, cpu);

if (p) {

// 绑定CPU

kthread_bind(p, cpu);

// 唤醒线程

wake_up_process(p);

} else

destroy = 1;

}

}

mutex_unlock(&workqueue_mutex);

/*

* Was there any error during startup? If yes then clean up:

*/

if (destroy) {

// 建立线程失败, 释放工作队列

destroy_workqueue(wq);

wq = NULL;

}

return wq;

}

EXPORT_SYMBOL_GPL(__create_workqueue);

// 创建工作队列线程

static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,

int cpu)

{

// 每个CPU的工作队列

struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);

struct task_struct *p;

spin_lock_init(&cwq->lock);

// 初始化

cwq->wq = wq;

cwq->thread = NULL;

cwq->insert_sequence = 0;

cwq->remove_sequence = 0;

INIT_LIST_HEAD(&cwq->worklist);

// 初始化等待队列more_work, 该队列处理要执行的工作结构

init_waitqueue_head(&cwq->more_work);

// 初始化等待队列work_done, 该队列处理执行完的工作结构

init_waitqueue_head(&cwq->work_done);

// 建立内核线程work_thread

if (is_single_threaded(wq))

p = kthread_create(worker_thread, cwq, "%s", wq->name);

else

p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);

if (IS_ERR(p))

return NULL;

// 保存线程指针

cwq->thread = p;

return p;

}

static int worker_thread(void *__cwq)

{

struct cpu_workqueue_struct *cwq = __cwq;

// 声明一个等待队列

DECLARE_WAITQUEUE(wait, current);

// 信号

struct k_sigaction sa;

sigset_t blocked;

current->flags |= PF_NOFREEZE;

// 降低进程优先级, 工作进程不是个很紧急的进程,不和其他进程抢占CPU,通常在系统空闲时运行

set_user_nice(current, -5);

/* Block and flush all signals */

// 阻塞所有信号

sigfillset(&blocked);

sigprocmask(SIG_BLOCK, &blocked, NULL);

flush_signals(current);

/*

* We inherited MPOL_INTERLEAVE from the booting kernel.

* Set MPOL_DEFAULT to insure node local allocations.

*/

numa_default_policy();

/* SIG_IGN makes children autoreap: see do_notify_parent(). */

// 信号处理都是忽略

sa.sa.sa_handler = SIG_IGN;

sa.sa.sa_flags = 0;

siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));

do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);

// 进程可中断

set_current_state(TASK_INTERRUPTIBLE);

// 进入循环, 没明确停止该进程就一直运行

while (!kthread_should_stop()) {

// 设置more_work等待队列, 当有新work结构链入队列中时会激发此等待队列

add_wait_queue(&cwq->more_work, &wait);

if (list_empty(&cwq->worklist))

// 工作队列为空, 睡眠

schedule();

else

// 进行运行状态

__set_current_state(TASK_RUNNING);

// 删除等待队列

remove_wait_queue(&cwq->more_work, &wait);

// 按链表遍历执行工作任务

if (!list_empty(&cwq->worklist))

run_workqueue(cwq);

// 执行完工作, 设置进程是可中断的, 重新循环等待工作

set_current_state(TASK_INTERRUPTIBLE);

}

__set_current_state(TASK_RUNNING);

return 0;

}

// 运行工作结构

static void run_workqueue(struct cpu_workqueue_struct *cwq)

{

unsigned long flags;

/*

* Keep taking off work from the queue until

* done.

*/

// 加锁

spin_lock_irqsave(&cwq->lock, flags);

// 统计已经递归调用了多少次了

cwq->run_depth++;

if (cwq->run_depth > 3) {

// 递归调用此时太多

/* morton gets to eat his hat */

printk("%s: recursion depth exceeded: %d\n",

__FUNCTION__, cwq->run_depth);

dump_stack();

}

// 遍历工作链表

while (!list_empty(&cwq->worklist)) {

// 获取的是next节点的

struct work_struct *work = list_entry(cwq->worklist.next,

struct work_struct, entry);

void (*f) (void *) = work->func;

void *data = work->data;

// 删除节点, 同时节点中的list参数清空

list_del_init(cwq->worklist.next);

// 解锁

// 现在在执行以下代码时可以中断,run_workqueue本身可能会重新被调用, 所以要判断递归深度

spin_unlock_irqrestore(&cwq->lock, flags);

BUG_ON(work->wq_data != cwq);

// 工作结构已经不在链表中

clear_bit(0, &work->pending);

// 执行工作函数

f(data);

// 重新加锁

spin_lock_irqsave(&cwq->lock, flags);

// 执行完的工作序列号递增

cwq->remove_sequence++;

// 唤醒工作完成等待队列, 供释放工作队列

wake_up(&cwq->work_done);

}

// 减少递归深度

cwq->run_depth--;

// 解锁

spin_unlock_irqrestore(&cwq->lock, flags);

}

上一篇:没有了
下一篇:没有了
最新评论共有 4 位网友发表了评论
发表评论
评论内容:不能超过250字,需审核,请自觉遵守互联网相关政策法规。
用户名: 密码:
匿名?
注册