irq_work 主要是提供一个在中断上下文执行回调函数的框架。主要逻辑是先通过enqueue work(NMI save的),然后触发一个IPI中断,然后在IPI中断中执行enqueue的work func。其它路径下也有调用回调函数,比如offline cpu、进入idle等。
主要实现文件是 kernel/irq_work.c,使用该功能需要开启 CONFIG_IRQ_WORK。
1. struct irq_work
/* * An entry can be in one of four states: * * free NULL, 0 -> {claimed} : free to be used * claimed NULL, 3 -> {pending} : claimed to be enqueued * pending next, 3 -> {busy} : queued, pending callback * busy NULL, 2 -> {free, claimed} : callback in progress, can be claimed */ struct irq_work { //include/linux/irq_work.h union { /* 主要用于在触发非本地CPU的IPI中断时,挂入到对应CPU的队列中 */ struct __call_single_node node; struct { /* 通过它挂入 azy_list 或 raised_list 链表 */ struct llist_node llnode; /* 表示当前此irq_work的使用状态 */ atomic_t flags; }; }; /* 回调函数,在中断上下文中执行 */ void (*func)(struct irq_work *); };
由于向非local cpu进行queue的时候使用的是 llnode 成员,其它cpu响应时使用的是 node 成员,因此它两都要放在结构体的首位置。
2. 两个per-cpu的单链表
static DEFINE_PER_CPU(struct llist_head, raised_list); static DEFINE_PER_CPU(struct llist_head, lazy_list);
有两种 irq_work 类型,分别串联在两个无锁per-cpu单链表上。若queue work时 work->flags 有 IRQ_WORK_LAZY 标志,就会放到 lazy_list 链表,否则放到 raised_list 链表。这两个链表上的work在调用时机上会有所不同。
1. 初始化irq_work
可以使用函数进行初始化,也可以使用宏进行初始化
static inline void init_irq_work(struct irq_work *work, void (*func)(struct irq_work *)) { atomic_set(&work->flags, 0); work->func = func; } #define DEFINE_IRQ_WORK(name, _f) struct irq_work name = { \ .flags = ATOMIC_INIT(0), \ .func = (_f) \ }
2. enqueue irq_work
可以enqueue到当前CPU上,也可以enqueue到指定CPU上。enqueue到不同的CPU上其调用方式不同
(1) enqueue 到当前CPU上
/* Enqueue the irq work @work on the current CPU */ bool irq_work_queue(struct irq_work *work) { /* Only queue if not already pending */ if (!irq_work_claim(work)) return false; /* Queue the entry and raise the IPI if needed. */ preempt_disable(); __irq_work_queue_local(work); preempt_enable(); return true; } EXPORT_SYMBOL_GPL(irq_work_queue);
在queue之前,先调用 irq_work_claim() 判断此work此时是否可以使用,若别人事先已经对其flag标注了 IRQ_WORK_PENDING,则表示此
woek已经被enqueue过了,还没来得及处理,是不允许重复对其进行enqueue的,直接返回false。在这个work被处理时,在执行其回调之前
在 irq_work_single() 中清理 IRQ_WORK_PENDING 标志,在执行其回调之后,清理 IRQ_WORK_BUSY 标志。
/* Claim the entry so that no one else will poke at it. */ static bool irq_work_claim(struct irq_work *work) { int oflags; /* * 先返回 work->flags 的值,然后或上arg1 * IRQ_WORK_CLAIMED == IRQ_WORK_PENDING | IRQ_WORK_BUSY */ oflags = atomic_fetch_or(IRQ_WORK_CLAIMED | CSD_TYPE_IRQ_WORK, &work->flags); /* * If the work is already pending, no need to raise the IPI. * The pairing atomic_fetch_andnot() in irq_work_run() makes sure * everything we did before is visible. */ if (oflags & IRQ_WORK_PENDING) return false; return true; }
向本地链表上queue irq work。若指定了 IRQ_WORK_LAZY 标志,则会添加到当前CPU的 lazy_list 链表中,若是链表上的首个work且此时动态时钟tick已经关闭了,那么就会触发irq_work进行处理,否则会等到下一个tick处理回调函数。
若是没有指定 IRQ_WORK_LAZY 标志,则会添加到当前CPU的 raised_list 链表中,若是链表上的首个work,则会触发对irq_work的处理。处理流程放到下节讲。
/* Enqueue on current CPU, work must already be claimed and preempt disabled */ static void __irq_work_queue_local(struct irq_work *work) { /* If the work is "lazy", handle it from next tick if any */ if (atomic_read(&work->flags) & IRQ_WORK_LAZY) { /* * llist_add: 支持无锁链表操作,在添加节点之前链表为空则返回真,否则返回假 * tick_nohz_tick_stopped: idle状态的周期时钟已经停止为真 * 两者同时满足才会raise IPI中断(通常应该比较难满足)。 */ if (llist_add(&work->llnode, this_cpu_ptr(&lazy_list)) && tick_nohz_tick_stopped()) arch_irq_work_raise(); //queue的时候已经调用了 } else { /* 在添加节点之前链表为空则返回真,否则返回假 */ if (llist_add(&work->llnode, this_cpu_ptr(&raised_list))) arch_irq_work_raise(); //queue的时候已经调用了 } }
(2) enqueue 到当指定CPU上
/* * Enqueue the irq_work @work on @cpu unless it's already pending somewhere. * * Can be re-enqueued while the callback is still in progress. */ bool irq_work_queue_on(struct irq_work *work, int cpu) { /* All work should have been flushed before going offline */ WARN_ON_ONCE(cpu_is_offline(cpu)); /* Only queue if not already pending 同样有对work此时状态的判断 */ if (!irq_work_claim(work)) return false; preempt_disable(); /* queue到当前cpu的处理逻辑和非当前cpu的是不同的 */ if (cpu != smp_processor_id()) { /* Arch remote IPI send/receive backend aren't NMI safe */ WARN_ON_ONCE(in_nmi()); /* 最终调用:smp_cross_call(cpumask_of(cpu), IPI_CALL_FUNC); */ __smp_call_single_queue(cpu, &work->llnode); } else { __irq_work_queue_local(work); } preempt_enable(); return true; } EXPORT_SYMBOL_GPL(irq_work_queue_on);
1. queue到当前CPU上且queue之前链表为空,此时会直接调用 arch_irq_work_raise() 来触发当前CPU上的IPI中断进行处理,其实现依赖于具体的体系架构,在Arm64上是:
/* 但是没有EXPORT_SYMBOL_GPL,模块中无法使用 */ void arch_irq_work_raise(void) //arch/arm64/kernel/smp.c { //此类型的IPI中断在trace上显示的是"IRQ work interrupts" smp_cross_call(cpumask_of(smp_processor_id()), IPI_IRQ_WORK); } /* * Main handler for inter-processor interrupts */ static void do_handle_IPI(int ipinr) { unsigned int cpu = smp_processor_id(); if ((unsigned)ipinr < NR_IPI) trace_ipi_entry_rcuidle(ipi_types[ipinr]); //单检索只有这里一个 trace_ipi_entry trece event switch (ipinr) { ... case IPI_CALL_FUNC: generic_smp_call_function_interrupt(); break; ... case IPI_IRQ_WORK: irq_work_run(); break; ... } if ((unsigned)ipinr < NR_IPI) trace_ipi_exit_rcuidle(ipi_types[ipinr]); }
调用路径:
gic_smp_init set_smp_ipi_range request_percpu_irq(ipi_base + i, ipi_handler, "IPI", &cpu_number); //为每个IPI irq类型注册一个per-cpu的irq中断 ipi_handler do_handle_IPI(irq - ipi_irq_base);
irq_work_run() 里面会去遍历 raised_list 和 lazy_list,调用上面的所有回调函数。这些回调函数都是在中断上下文中被调用的。
/* * hotplug calls this through: hotplug_cfd() -> flush_smp_call_function_queue() */ void irq_work_run(void) //TODO: 看谁在哪调用的? { /*前者优先级比后者高,只有前者链表上所有回调都执行完后才会调用后者的 */ irq_work_run_list(this_cpu_ptr(&raised_list)); irq_work_run_list(this_cpu_ptr(&lazy_list)); } EXPORT_SYMBOL_GPL(irq_work_run); static void irq_work_run_list(struct llist_head *list) { struct irq_work *work, *tmp; struct llist_node *llnode; BUG_ON(!irqs_disabled()); if (llist_empty(list)) return; /* * 返回链表头,然后将链表头设置为NULL,也就是将链表清空, * 之后enqueue的irq work就更可能重新触发IPI中断了。 */ llnode = llist_del_all(list); /* 逐个处理上面的work func 回调 */ llist_for_each_entry_safe(work, tmp, llnode, llnode) irq_work_single(work); }
处理单个irq_work的回调函数,在这个函数中可以debug执行的回调函数名和执行耗时。
void irq_work_single(void *arg) { struct irq_work *work = arg; int flags; #if IS_ENABLED(CONFIG_MY_IRQ_MONITOR_DEBUG) u64 start, end, process_time; #endif /* * 翻译: * 清除了这个标志位后,这个work才能被重新利用。 * 让它立即可见,这样当我们在函数中间时,其他试图claim工作的CPU * 就不会依赖我们来处理他们的数据。 * * 先返回 work->flags 的值,然后再 work->flags &= ~IRQ_WORK_PENDING; */ flags = atomic_fetch_andnot(IRQ_WORK_PENDING, &work->flags); #if IS_ENABLED(CONFIG_MY_IRQ_MONITOR_DEBUG) //GKI内核中没有这些 start = sched_clock(); #endif lockdep_irq_work_enter(work); //默认不使能CONFIG_TRACE_IRQFLAGS,为空函数 /* 处理irq_work的回调函数,参数是irq_work本身 */ work->func(work); lockdep_irq_work_exit(work); //默认不使能CONFIG_TRACE_IRQFLAGS,为空函数s #if IS_ENABLED(CONFIG_MY_IRQ_MONITOR_DEBUG) end = sched_clock(); process_time = end - start; if (process_time > 5000000L) // > 5ms pr_notice("irq_monitor: function: %pS time: %lld func: %s line: %d", work->func, process_time, __func__, __LINE__); #endif /* Clear the BUSY bit and return to the free state if no-one else claimed it meanwhile. */ flags &= ~IRQ_WORK_PENDING; /* 若 work->flags == flags,则 work->flags = flags & ~IRQ_WORK_BUSY */ (void)atomic_cmpxchg(&work->flags, flags, flags & ~IRQ_WORK_BUSY); }
2. 若irq_work queue到非当前CPU上会调用到 __smp_call_single_queue()
void __smp_call_single_queue(int cpu, struct llist_node *node) //kernel/smp.c { if (llist_add(node, &per_cpu(call_single_queue, cpu))) send_call_function_single_ipi(cpu); } void send_call_function_single_ipi(int cpu) //kernel/sched/core.c { struct rq *rq = cpu_rq(cpu); /* SMP下函数返回false,非一下后恒为真 */ if (!set_nr_if_polling(rq->idle)) arch_send_call_function_single_ipi(cpu); else trace_sched_wake_idle_without_ipi(cpu); } void arch_send_call_function_single_ipi(int cpu) { smp_cross_call(cpumask_of(cpu), IPI_CALL_FUNC); }
如上,在 do_handle_IPI() 中会调用到 generic_smp_call_function_interrupt()
#define generic_smp_call_function_interrupt generic_smp_call_function_single_interrupt //include/linux/smp.h /** * generic_smp_call_function_single_interrupt - Execute SMP IPI callbacks * * Invoked by arch to handle an IPI for call function single. * Must be called with interrupts disabled. */ void generic_smp_call_function_single_interrupt(void) //kernel/smp.c { flush_smp_call_function_queue(true); } /** * flush_smp_call_function_queue - Flush pending smp-call-function callbacks * * @warn_cpu_offline: If set to 'true', warn if callbacks were queued on an * offline CPU. Skip this check if set to 'false'. * * Flush any pending smp-call-function callbacks queued on this CPU. This is * invoked by the generic IPI handler, as well as by a CPU about to go offline, * to ensure that all pending IPI callbacks are run before it goes completely * offline. * * Loop through the call_single_queue and run all the queued callbacks. * Must be called with interrupts disabled. */ static void flush_smp_call_function_queue(bool warn_cpu_offline) { call_single_data_t *csd, *csd_next; struct llist_node *entry, *prev; struct llist_head *head; static bool warned; lockdep_assert_irqs_disabled(); /* 取出所有的entry并reverse链表 */ head = this_cpu_ptr(&call_single_queue); entry = llist_del_all(head); entry = llist_reverse_order(entry); ... prev = NULL; llist_for_each_entry_safe(csd, csd_next, entry, llist) { int type = CSD_TYPE(csd); ... if (prev) { prev->next = &csd_next->llist; //从链表中删除csd节点 } else { entry = &csd_next->llist; } if (type == CSD_TYPE_IRQ_WORK) { /* 遍历挂到此CPU上的irq_work,逐个处理 */ irq_work_single(csd); } ... prev = &csd->llist; //prev顺着链表路由 } ... /* * Third; only CSD_TYPE_TTWU is left, issue those. * 唤醒非local cpu上的任务,是最后处理的,可能会有一定的延迟!这些延迟可能来自处理上面的这些逻辑 */ if (entry) sched_ttwu_pending(entry); }
3. 在CPU offline的处理路径中也会处理所有的irq_work
struct cpuhp_step cpuhp_hp_states[] { //kernel/cpu.c ... [CPUHP_AP_SMPCFD_DYING] = { .name = "smpcfd:dying", .startup.single = NULL, .teardown.single = smpcfd_dying_cpu, }, ... } int smpcfd_dying_cpu(unsigned int cpu) { flush_smp_call_function_queue(false); //会处理per-cpu call_single_queue 上的work,包括irq_work irq_work_run(); //会处理本cpu raised_list 和 lazy_list 链表上的work return 0; }
4. 任务迁移走或进入idle前也会调用
__set_cpus_allowed_ptr_locked //kernel/sched/core.c sched_exec //kernel/sched/core.c stop_one_cpu migration_cpu_stop //kernel/sched/core.c //(1)将任务迁移走之前,先对当前cpu执行一次 do_idle //kernel/sched/idle.c //(2)在进入idle之前会调用一次 flush_smp_call_function_from_idle flush_smp_call_function_queue(true); //会处理per-cpu call_single_queue 上的work,包括irq_work
5. tick的handler中也有处理,但是只处理的是 lazy_list 链表上的irq_work
void irq_work_tick(void) { struct llist_head *raised = this_cpu_ptr(&raised_list); /* 后者恒返回真,非后if判断恒为假 */ if (!llist_empty(raised) && !arch_irq_work_has_interrupt()) irq_work_run_list(raised); //只运行的是这个 irq_work_run_list(this_cpu_ptr(&lazy_list)); }
调用路径:
tick_nohz_handler //kernel/time/tick-sched.c 当切换至低分辨率动态时钟模式后,tick中断处理函数 tick_sched_timer //kernel/time/tick-sched.c 模拟tick时间的hrtimer的回调函数 tick_sched_handle //kernel/time/tick-sched.c update_process_times //kernel/time/timer.c 更新进程信息和触发定时器软中断等 if (in_irq()) irq_work_tick(); //irq_work.c
1. 使用 raised_list
//1. 初始化 #include <linux/irq_work.h> /* 定义一个 irq_work 结构 */ struct irq_work irq_work; void irq_work_func(struct irq_work *work) { pr_info("I am excuted in interrupt context\n"); } init_irq_work(&irq_work, irq_work_func); //2. 使用时 enqueue work irq_work_queue(&irq_work);