引用
preempt
宋宝华: 是谁关闭了Linux抢占,而抢占又关闭了谁?
Linux用户抢占和内核抢占详解(概念, 实现和触发时机)--Linux进程的管理与调度(二十)
内核抢占实现(preempt)
Linux中的preempt_count - 知乎 (zhihu.com)
linux 中断子系统 - linux 内核中的上下文判断 - 知乎 (zhihu.com)
softirq
linux kernel的中断子系统之(八):softirq
Linux Interrupt - 魅族内核团队
sofirq和tasklet - LoyenWang
tasklet
linux kernel的中断子系统之(九):tasklet
高并发的中断下半部tasklet实例解析
workqueue
Linux Workqueue- 魅族内核团队
Concurrency Managed Workqueue - wowo
Linux中断子系统(四)-Workqueue - LoyenWang
任务工厂 - Linux 中的 workqueue 机制 [一] - 知乎 (zhihu.com)
Linux Workqueue 机制分析 - 博客 - binsite (binss.me)
Linux中断管理 (3)workqueue工作队列 - ArnoldLu - 博客园 (cnblogs.com)
timer
Linux时间子系统之(二):软件架构
Linux kernel之内核定时器
Linux内核定时器和工作队列的总结和实例
Linux内核高精度定时器hrtimer的使用
Linux 应用层的时间编程
硬件和 GLibC 库的细节
Linux 内核的工作 - timer
Linux 内核的工作-hrtimer
一. 为什么要有上下半部
中断分成上下半部处理可以提高中断的响应能力,在上半部处理完成后便将cpu中断打开(通常上半部处理越快越好),这样就可以响应其他中断了,等到中断退出的时候再进行下半部的处理。
二. preempt_count
task_struct结构体中的thread_info.preempt_count用于记录当前任务所处的context状态;
PREEMPT_BITS
用于记录禁止抢占的次数,禁止抢占一次该值就加1,使能抢占该值就减1;
SOFTIRQ_BITS
用于同步处理,关掉下半部的时候加1,打开下半部的时候减1;
HARDIRQ_BITS
用于表示处于硬件中断上下文中;
in_softirq和in_serving_softirq都表示处于softirq上下文,但并不意味着程序正在执行软中断,区别是:
in_serving_softirq表示 当前一定有软中断处于执行状态。(bit8 - SOFTIRQ_OFFSET)
in_softirq 除了可以表示当前有软中断处于执行状态,还有可能表示当前的context只是disable软中断的thread上下文。(例如:local_bh_disable()下的context)
中断上下文 - interrupt context
我们将 NMI, HARDIRQ, SOFTIRQ 上下文 统称为中断上下文。
可用 in_interrupt() 判断
进程上下文 - process context
与中断上下文相对应。
可用 in_task() 判断
原子上下文 - atomic context
不能发生进程睡眠或者调度的上下文。
处于中断上下文,或者显示地禁止了调度,preempt_count()的值都不为0,都不允许睡眠/调度的发生,这两种场景被统称为atomic上下文。
可用 in_atomic() 来判断当前cpu是否处于atomic上下文。
也就是非 preempt_count 非 0 时,都属于 atomic 上下文,其中包括中断、软中断等中断上下文,还包括进程或者内核线程运行时关中断或者关抢占。
由于该接口在有些场景下不能精确检测,所以 不推荐在driver中使用。
三种上下文的关系
三. softirq
softirq是静态的,不支持动态分配。
相关数据结构
/* 支持的软中断类型,可以认为是软中断号, 其中从上到下优先级递减 */
enum
{HI_SOFTIRQ=0, /* 最高优先级软中断 */TIMER_SOFTIRQ, /* Timer定时器软中断 */NET_TX_SOFTIRQ, /* 发送网络数据包软中断 */NET_RX_SOFTIRQ, /* 接收网络数据包软中断 */BLOCK_SOFTIRQ, /* 块设备软中断 */IRQ_POLL_SOFTIRQ, /* 块设备软中断 */TASKLET_SOFTIRQ, /* tasklet软中断 */SCHED_SOFTIRQ, /* 进程调度及负载均衡的软中断 */HRTIMER_SOFTIRQ, /* Unused, but kept as tools rely on thenumbering. Sigh! */RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq, RCU相关的软中断 */NR_SOFTIRQS
};/* 软件中断描述符,只包含一个handler函数指针 */
struct softirq_action {void (*action)(struct softirq_action *);
};/* 软中断描述符表,实际上就是一个全局的数组 */
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;/* CPU软中断状态描述,当某个软中断触发时,__softirq_pending会置位对应的bit */
typedef struct {unsigned int __softirq_pending;unsigned int ipi_irqs[NR_IPI];
} ____cacheline_aligned irq_cpustat_t;/* 每个CPU都会维护一个状态信息结构 */
irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;/* 内核为每个CPU都创建了一个软中断处理内核线程 */
DEFINE_PER_CPU(struct task_struct *, ksoftirqd);
数据结构关系图
softirq_vec[]数组,类比硬件中断描述符表irq_desc[],通过软中断号可以找到对应的handler进行处理,比如图中的tasklet_action就是一个实际的handler函数;
软中断可以在不同的CPU上并行运行,在同一个CPU上只能串行执行;(即软中断不保证重入问题)
每个CPU维护irq_cpustat_t状态结构,当某个软中断需要进行处理时,会将该结构体中的__softirq_pending字段或上1UL << XXX_SOFTIRQ;
软中断的触发点
raise_softirq()/raise_softirq_irqoff() 会设置当前本地cpu的irq_stat中的 __softirq_pending字段,并将相应的软中断号置位,即表明该软中断有处理请求。
软中断执行点
中断处理后;
bottom-half enable后;
思考
为什么在使能Bottom-half时要进行软中断处理呢?
==》
在并发处理时,可能已经把Bottom-half进行关闭了,如果此时中断来了后,软中断不会被处理,在进程上下文中打开Bottom-half时,这时候就会检查是否有软中断处理请求了;
四. tasklet
tasklet是软中断的一种类型,那么两者有啥区别呢?
软中断类型内核中都是静态分配,不支持动态分配,而tasklet支持动态和静态分配,也就是驱动程序中能比较方便的进行扩展;
软中断可以在多个CPU上并行运行,因此需要考虑可重入问题,而tasklet会绑定在某个CPU上运行,运行完后再解绑,不要求重入问题,当然它的性能也就会下降一些;
DEFINE_PER_CPU(struct tasklet_head, tasklet_vec)为每个CPU都分配了tasklet_head结构,该结构用来维护struct tasklet_struct链表,需要放到该CPU上运行的tasklet将会添加到该结构的链表中,内核中为每个CPU维护了两个链表tasklet_vec和tasklet_vec_hi,对应两个不同的优先级,本文以tasklet_vec为例;
struct tasklet_struct为tasklet的抽象,几个关键字段如图所示,通过next来链接成链表,通过state字段来标识不同的状态以确保能在CPU上串行执行,func函数指针在调用task_init()接口时进行初始化,并在最终触发软中断时执行;
接口
/* 静态分配tasklet */
DECLARE_TASKLET(name, func, data)/* 动态分配tasklet */
void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data);/* 禁止tasklet被执行,本质上是增加tasklet_struct->count值,以便在调度时不满足执行条件 */
void tasklet_disable(struct tasklet_struct *t);/* 使能tasklet,与tasklet_diable对应 */
void tasklet_enable(struct tasklet_struct *t);/* 调度tasklet,通常在设备驱动的中断函数里调用 */
void tasklet_schedule(struct tasklet_struct *t);/* 杀死tasklet,确保不被调度和执行, 主要是设置state状态位 */
void tasklet_kill(struct tasklet_struct *t);
五. workqueue/delay wrokqueue
linux workqueue机制有多个woker 线程?
有多个worker_pool,管理多个worker。
针对bound绑定类型的工作队列,worker_pool是Per-CPU创建,每个CPU都有两个worker_pool,对应不同的优先级,nice值分别为0和-20;
针对un-bound非绑定类型的工作队列,worker_pool创建后会添加到unbound_pool_hash哈希表中;
每个worker_pool至少有一个worker。
worker内核线程是在每个worker_pool中由一个初始的空闲工作线程创建的,并根据需要动态创建和销毁;
create_worker函数中,创建的内核线程名字为kworker/XX:YY或者kworker/uXX:YY,其中XX表示worker_pool的编号,YY表示worker的编号,u表示unbound;
bound和un-bound workqueue的区别?
bound:绑定处理器的工作队列,其会被bound的worker_pool服务,该worker_pool创建的worker内核线程会被绑定到特定的CPU上运行;
unbound:不绑定处理器的工作队列,其会被un-bound的worker_pool服务,创建的时候需要指定WQ_UNBOUND标志,内核线程可以在处理器间迁移;
何时创建更多的worker?
内核线程执行worker_thread函数时,如果没有空闲的worker,会调用manage_workers接口来创建更多的worker来处理工作;
何时销毁多余的worker?
一个worker被创建后首先进入worker_enter_idle(),里面启动了pool->idle_timer,定时IDLE_WORKER_TIMEOUT即300HZ。如果一个worker进入idle超过300HZ,即会执行idle_worker_timeout(),会根据情况进行销毁多余的worker。
如何解决一个work阻塞或者死锁了,导致其他的work得不到执行,即各种work之间的互相影响?
在worker线程执行时,会尝试进行worker_pool管理工作,即会检查worker_pool中是否有至少一个idle状态的worker,如果没有,则创建一个新的worker。
系统每100ms启动pool->mayday_timer定时器,检查当前workerpool中是否存在allocation deadlock异常,如果指定了WQ_MEM_RECLAIM,则会启动rescuer worker 进行处理。
管理worker_pool的内核线程池时,如果有PENDING状态的work,并且发现没有正在运行的工作线程(worker_pool->nr_running == 0),唤醒空闲状态的内核线程,或者动态创建内核线程;
如果work已经在同一个worker_pool的其他worker中执行,不再对该work进行处理(防重入处理);
schedule_work 是如何工作的?
schedule_work完成的工作是将work添加到对应的链表中,而在添加的过程中,首先是需要确定pool_workqueue;
pool_workqueue对应一个worker_pool,因此确定了pool_workqueue也就确定了worker_pool,进而可以将work添加到工作链表中;
pool_workqueue的确定分为三种情况:
bound类型的工作队列,直接根据CPU号获取(可指定cpu,如果没有指定,则用当前cpu。);
unbound类型的工作队列,根据node号获取,针对unbound类型工作队列,pool_workqueue的释放是异步执行的,需要判断refcnt的计数值,因此在获取pool_workqueue时可能要多次retry;
根据缓存热度,优先选择正在被执行的worker_pool(所以,指定了cpu也不一定生效);
WORK_STRUCT_PENDING_BIT何时被设置以及被清0?
当一个work已经加入到workqueue队列中,schedule_work()->queue_work()->queue_work_on()时被设置。
当一个work在工作线程里马上要执行(即执行work之前),worker_thread()->process_on_work()->set_work_pool_and_clear_pend是清0。
上述设置和清0都是在关闭本地中断情况下执行的。
如何判断一个workqueue中的work已经全部执行完成?
添加一个flush work到对应的workqueue中,如果该flush work都已经被执行了,那它之前的work也应该被执行完成了。
如何指定一个work跑到特定的cpu上?
寻找合适的pool_workqueue,优先选择本地CPU对应的pool_workqueue;如果该work正在另一个CPU工作线程池中运行,则优先选择此工作线程池。
如果一个work正在上一次的worker_pool中执行,则本次work也会被放到上次执行的worker_pool中,是为了利用缓存,提高效率。 (即work_queue_on() 不一定能够指定运行的cpu。)
如何指定一个workqueue中,不同work被同时执行的个数?
判断当前pool_workqueue的work活跃数量,如果少于最高限值,就加入pending状态链表worker_pool->worklist,否则加入delayed_works链表中。
如果一个worker_pool中的worker要进入睡眠了,如何保证其余的work能够被执行?
在__schedule() 函数中,会进行处理。
当一个工作线程要被调度器换出时,调用wq_worker_sleeping()看看是否需要唤醒同一个线程池中的其它内核线程。
workqueue lockup机制是怎么样的?
会用一个timer 检查一个work执行了 30s都没有退出,就会大于warning log。
WQ_MEM_RECLAIM 标志表示什么意思?
创建工作队列时,如果设置了WQ_MEM_RECLAIM标志,则会新建rescuer worker,对应rescuer_thread内核线程。
目的是当内存紧张时,新创建worker可能会失败,这时候由rescuer来处理这种情况;
数据结构
work_struct:工作队列调度的最小单位,work item;
workqueue_struct:工作队列,work item都挂入到工作队列中;
worker:work item的处理者,每个worker对应一个内核线程;
worker_pool:worker池(内核线程池),是一个共享资源池,提供不同的worker来对work item进行处理;
pool_workqueue:充当桥梁纽带的作用,用于连接workqueue和worker_pool,建立链接关系;