內存管理+ linux內核並發與同步機制
一、臨界資源:
臨界區是指訪問或操作共享資源的代碼段,這些資源無法同時被多個執行線程訪問,為了避免臨界區的並發
訪問,需要保證臨界區的原子性,臨界區不能有多個並發源同時執行,原子性保護的是資源和數據,包括靜態局部
變數、全局變數、共享的數據結構、Buffer緩存等各種資源數據,產生並發訪問的並發源主要有如下:
中斷和異常:中斷髮生後,中斷執行程序和被中斷的進程之間可能產生並發訪問;
軟中斷和tasklet:軟中斷或者tasklet隨時可能會被調度執行,從而打斷當前正在執行的進程上下文;
內核搶佔:進程調度器支持搶佔特性,會導致進程和進程之間的並發訪問;
多處理器並發執行:多個處理器的上可以同時執行多個相同或者不同的進程。
二、同步機制:
上面已經介紹了臨界資源的相關內容,linux內核中的同步機制就是為了保證臨界資源不產生並發訪問的處理
方法,當然內核中針對不同的場景有不同的同步機制,如:原子操作、自旋鎖、信號量、Mutex互斥體、讀寫鎖、
RCU鎖等機制一面會一一介紹,並比較各種機制之間的差別。
1、原子操作:
(1)原子變數操作:
原子操作是指保證指令以原子的方式執行,執行的過程中不會被打斷。linux內核提供了atomic_t類型的原子
變數,變數的定義如下:
typedef struct {
int counter;
} atomic_t;
原子變數的常見的操作介面和用法如下:
#define ATOMIC_INIT(i) { (i) } //定義一個原子變數並初始化為i
#define atomic_read(v) READ_ONCE((v)->counter) //讀取原子變數的值
static inline void atomic_add(int i, atomic_t *v) //原子變數v增加i
static inline void atomic_sub(int i, atomic_t *v) //原子變數v減i
static inline void atomic_inc(atomic_t *v) //原子變數值加1
static inline void atomic_dec(atomic_t *v) //原子變數值減1
......
atomic_t use_cnt;
atomic_set(&use_cnt, 2);
atomic_add(4, &use_cnt);
atomic_inc(use_cnt);
(2)原子位操作:
在編寫代碼時,有時會使用到對某一個寄存器或者變數設置某一位的操作,可以使用如下的介面:
unsigned long word = 0;
set_bit(0, &word); /*第0位被設置*/
set_bit(1, &word); /*第1位被設置*/
clear_bit(1, &word); /*第1位被清空*/
change_bit(0, &word); /*翻轉第0位*/
2、自旋鎖spin_lock:
(1)自旋鎖的特點如下:
a、忙等待、不允許睡眠、快速執行完成,可用於中斷服務程序中;
b、自旋鎖可以保證臨界區不受別的CPU和本CPU的搶佔進程打擾;
c、如果A執行單元首先獲得鎖,那麼當B進入同一個常式時將獲知自旋鎖已被持有,需等待A釋放後才能進入,
所以B只好原地打轉(自旋);
d、自旋鎖鎖定期間不能調用可能引起進程調度的函數,如:copy_from_user(),copy_to_user(), kmalloc(),msleep();
(2)自旋鎖的操作介面:
//定義於#include<linux/spinlock_types.h>
spinlock_t lock; //定義自旋鎖
spin_lock_init(&lock); //初始化自旋鎖
spin_lock(&lock); //如不能獲得鎖,原地打轉。
spin_trylock(&lock);//嘗試獲得,如能獲得鎖返回真,不能獲得返回假,不再原地打轉。
spin_unlock(&lock); //與spin_lock()和spin_trylock()配對使用。
spin_lock_irq()
spin_unlock_irq()
spin_lock_irqsave()
spin_unlock_irqrestore()
spin_lock_bh()
spin_unlock_bh()
(3)使用舉例:
spinlock_t lock; //定義自旋鎖 --全局變數
spin_lock_init(&lock); //初始化自旋鎖 --初始化函數中
spin_lock(&lock); // 獲取自旋鎖 --成對在操作前後使用
//臨界區......
spin_unlock(&lock) //釋放自旋鎖
3、信號量:
(1)信號量的特點:
a、睡眠等待、可以睡眠、可以執行耗時長的進程;
b、共享資源允許被多個不同的進程同時訪問;
c、信號量常用於暫時無法獲取的共享資源,如果獲取失敗則進程進入不可中斷的睡眠狀態,只能由釋放資源的進程來喚醒;
d、信號量不能在中斷服務程序中使用,因為中斷服務程序是不允許進程睡眠的;
(2)信號量的使用:
struct semaphore {
atomic_t count; //共享計數值
int sleepers; //等待當前信號量進入睡眠的進程個數
wait_queue_head_t wait; // wait是當前信號量的等待隊列
};
//count用於判斷是否可以獲取該信號量:
//count大於0說明可以獲取信號量;
//count小於等於0,不可以獲取信號量;
操作介面如下:
static inline void down(struct semaphore * sem)
//獲取信號量,獲取失敗則進入睡眠狀態
static inline void up(struct semaphore * sem)
//釋放信號量,並喚醒等待隊列中的第一個進程
int down_interruptible(struct semaphore * sem);
// down_interruptible能被信號打斷;
int down_trylock(struct semaphore * sem);
//該函數嘗試獲得信號量sem,如果能夠立刻獲得,它就獲得該信號量並返回0,否則,返回非0值。
如:
down(sem);
...臨界區...
up(sem);
4、mutex_lock互斥鎖:
互斥鎖主要用於實現內核中的互斥訪問功能。內核互斥鎖是在原子API之上實現的,但這對於內核用戶是不可見的。
對它的訪問必須遵循一些規則:同一時間只能有一個任務持有互斥鎖,而且只有這個任務可以對互斥鎖進行解鎖。互斥鎖
不能進行遞歸鎖定或解鎖。一個互斥鎖對象必須通過其API初始化,而不能使用memset或複製初始化。一個任務在持有互
斥鎖的時候是不能結束的。互斥鎖所使用的內存區域是不能被釋放的。使用中的互斥鎖是不能被重新初始化的。並且互斥
鎖不能用於中斷上下文。但是互斥鎖比當前的內核信號量選項更快,並且更加緊湊。
(1)互斥體禁止多個線程同時進入受保護的代碼「臨界區」(critical section),因此,在任意時刻,只有一個線程被允許
進入這樣的代碼保護區。mutex實際上是count=1情況下的semaphore。
struct mutex {
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
struct task_struct *owner;
......
};
結構體成員說明:
atomic_t count;指示互斥鎖的狀態:
1 沒有上鎖,可以獲得;0 被鎖定,不能獲得,初始化為沒有上鎖;
spinlock_t wait_lock;
等待獲取互斥鎖中使用的自旋鎖,在獲取互斥鎖的過程中,操作會在自旋鎖的保護中進行,
初始化為為鎖定。
struct list_head wait_list;
等待互斥鎖的進程隊列。
(2)mutex的使用:
a、初始化
mutex_init(&mutex); //動態初始化互斥鎖
DEFINE_MUTEX(mutexname); //靜態定義和初始化互斥鎖
b、上鎖
void mutex_lock(struct mutex *lock);
//無法獲得鎖時,睡眠等待,不會被信號中斷。
int mutex_trylock(struct mutex *lock);
//此函數是 mutex_lock()的非阻塞版本,成功返回1,失敗返回0
int mutex_lock_interruptible(struct mutex *lock);
//和mutex_lock()一樣,也是獲取互斥鎖。在獲得了互斥鎖或進入睡眠直
//到獲得互斥鎖之後會返回0。如果在等待獲取鎖的時候進入睡眠狀態收到一
//個信號(被信號打斷睡眠),則返回_EINIR。
c、解鎖
void mutex_unlock(struct mutex *lock);
5、讀寫鎖:
讀寫鎖實際是一種特殊的自旋鎖,它把對共享資源的訪問者劃分成讀者和寫者,讀者只對共享資源進行讀訪問,
寫者則需要對共享資源進行寫操作。這種鎖相對於自旋鎖而言,能提高並發性,因為在多處理器系統中,它允許同
時有多個讀者來訪問共享資源,最大可能的讀者數為實際的邏輯CPU數。寫者是排他性的,一個讀寫鎖同時只能有
一個寫者或多個讀者(與CPU數相關),但不能同時既有讀者又有寫者。
如果讀寫鎖當前沒有讀者,也沒有寫者,那麼寫者可以立刻獲得讀寫鎖,否則它必須自旋在那裡,直到沒有任
何寫者或讀者。如果讀寫鎖沒有寫者,那麼讀者可以立即獲得該讀寫鎖,否則讀者必須自旋在那裡,直到寫者釋放
該讀寫鎖。
(1)操作函數介面:
rwlock_init(x)
DEFINE_RWLOCK(x)
read_trylock(lock)
write_trylock(lock)
read_lock(lock)
write_lock(lock)
read_unlock(lock)
write_unlock(lock)
(2)總結:
A、讀寫鎖本質上就是一個計數器,初始化值為0x01000000,表示最多可以有0x01000000個讀者同時獲取讀鎖;
B、獲取讀鎖時,rw計數減1,判斷結果的符號位是否為1,若結果符號位為0時,獲取讀鎖成功;
C、獲取讀鎖時,rw計數減1,判斷結果的符號位是否為1。若結果符號位為1時,獲取讀鎖失敗,表示此時讀寫鎖被寫者
佔有,此時調用__read_lock_failed失敗處理函數,循環測試rw+1的值,直到結果的值大於等於1;
D、獲取寫鎖時,rw計數減RW_LOCK_BIAS_STR,即rw-0x01000000,判斷結果是否為0。若結果為0時,獲取寫鎖成功;
E、獲取寫鎖時,rw計數減RW_LOCK_BIAS_STR,即rw-0x01000000,判斷結果是否為0。若結果不為0時,獲取寫鎖失敗,
表示此時有讀者佔有讀寫鎖或有寫著佔有讀寫鎖,此時調用__write_lock_failed失敗處理函數,循環測試rw+0x01000000,
直到結果的值等於0x01000000;
F、通過對讀寫鎖的源代碼分析,可以看出讀寫鎖其實是帶計數的特殊自旋鎖,能同時被多個讀者佔有或一個寫者佔有,
但不能同時被讀者和寫者佔有。
6、讀寫信號量:
(1)特點:
a、同一時刻最多有一個寫者(writer)獲得鎖;
b、同一時刻可以有多個讀者(reader)獲得鎖;
c、同一時刻寫者和讀者不能同時獲得鎖;
(2)相關結構和函數介面:
struct rw_semaphore {
/*讀/寫信號量定義:
* - 如果activity為0,那麼沒有激活的讀者或寫者。
* - 如果activity為+ve,那麼將有ve個激活的讀者。
* - 如果activity為-1,那麼將有1個激活的寫者。 */
__s32 activity; /*信號量值*/
spinlock_t wait_lock; /*用於鎖等待隊列wait_list*/
struct list_head wait_list; /*如果非空,表示有進程等待該信號量*/
};
void init_rwsem(struct rw_semaphore* rw_sem); //初始化讀寫信號量
void down_read(struct rw_semaphore* rw_sem); //獲取讀信號量
int down_read_trylock(struct rw_semaphore* rw_sem); //嘗試獲取讀信號量
void up_read(struct rw_semaphore* rw_sem);
void down_write(struct rw_semaphore* rw_sem); //獲取寫信號量
int down_write_trylock(struct rw_semaphore* rw_sem);//嘗試獲取寫信號量
void up_write(struct rw_semaphore* rw_sem);
(3)使用方法:
rw_semaphore sem;
init_rwsem(&sem);
down_read(&sem);
...臨界區...
up_read(&sem);
down_write(&sem);
...臨界區...
up_write(&sem);
三、總結與拓展
1、針對上面的各種同步機制直接的差異,可以如下表格清晰的表明:
2、在解決穩定性相關的問題時,難免會出現一些死鎖的問題,可以添加一些debug宏來配合調試:
CONFIG_LOCK_STAT=y
CONFIG_DEBUG_LOCKDEP=y
CONFIG_PROVE_LOCKING=y
CONFIG_DEBUG_MUTEXES=y
CONFIG_DEBUG_LOCK_ALLOC=y
CONFIG_RWSEM_SPIN_ON_OWNER=y
如下有一個簡單的死鎖的常式:
static DEFINE_SPINLOCK(hack_spinA);
static DEFINE_SPINLOCK(hack_spinB);
void hack_spinBA(void)
{
printk("%s(),hack A and B
",__FUNCTION__);
spin_lock(&hack_spinA);
spin_lock(&hack_spinB);
}
void hack_spinAB(void)
{
printk("%s(),hack A
",__FUNCTION__);
spin_lock(&hack_spinA);
spin_lock(&hack_spinB);
}
static ssize_t hello_test_show(struct device *dev, struct device_attribute *attr, char *buf)
{
printk("%s()
",__FUNCTION__);
hack_spinBA();
return 0;
}
static ssize_t hello_test_store(struct device *dev, struct device_attribute *attr, const char *buf, size_t count)
{
int test;
printk("hello_test %s,%d
",__FUNCTION__,__LINE__);
test = 4;
hack_spinAB();
return printk("%s() test = %d
",__FUNCTION__,test);
}
static DEVICE_ATTR(hello_test, 0664, hello_test_show, hello_test_store);
static int hello_test_probe(struct platform_device *pdev)
{
printk("%s()
",__FUNCTION__);
device_create_file(&pdev->dev, &dev_attr_hello_test);
return 0;
}
上面的例子中,先cat對應的節點會先後拿住hack_spinA和hack_spinB的兩把鎖,但並沒有去釋放這兩把鎖,
所以再對應的相應節點做echo操作時,會出現一直無法拿住這把鎖,等待30S後會觸發HWT的重啟,看重啟的DB
文件可以發現:
/***********在58S時通過cat 執行到了hello_test_show函數,從而拿住了鎖********/
[ 58.453863] (1)[2577:cat]Dump cpuinfo
[ 58.456057] (1)[2577:cat]hello_test_show()
[ 58.456093] (1)[2577:cat]hack_spinBA(),hack A and B
[ 58.457426] (1)[2577:cat]note: cat[2577] exited with preempt_count 2
[ 58.458884] (1)[2577:cat]
[ 58.458920] (1)[2577:cat]=====================================
[ 58.458931] (1)[2577:cat][ BUG: cat/2577 still has locks held! ]
[ 58.458944] (1)[2577:cat]4.4.146+ #5 Tainted: G W O
[ 58.458955] (1)[2577:cat]-------------------------------------
[ 58.458965] (1)[2577:cat]lockdep: [Caution!] cat/2577 is runable state
[ 58.458976] (1)[2577:cat]lockdep: 2 locks held by cat/2577:
[ 58.458988] #0: (hack_spinA){......}
[ 58.459015] lockdep: , at: (1)[2577:cat][<c0845e8c>] hack_spinBA+0x24/0x3c
[ 58.459049] #1: (hack_spinB){......}
[ 58.459074] lockdep: , at: (1)[2577:cat][<c0845e94>] hack_spinBA+0x2c/0x3c
[ 58.459098] (1)[2577:cat]
/**********在64S是通過echo指令調用到hello_test_store再次去拿鎖,導致死鎖******/
[ 64.083878] (2)[1906:sh]hello_test hello_test_store,41
[ 64.083914] (2)[1906:sh]hack_spinAB(),hack A
/**********在94S時出現 HWT 重啟***************/
[ 94.092084] -(2)[1906:sh][<c0836a40>] (aee_wdt_atf_info) from [<c0837428>] (aee_wdt_atf_entry+0x180/0x1b8)
[ 94.093571] -(2)[1906:sh][<c08372a8>] (aee_wdt_atf_entry) from [<c0152bfc>] (preempt_count_sub+0xe4/0x100)
[ 94.095055] -(2)[1906:sh][<c019694c>] (do_raw_spin_lock) from [<c0c6f5f8>] (_raw_spin_lock+0x48/0x50)
[ 94.096548] -(2)[1906:sh][<c0c6f5b0>] (_raw_spin_lock) from [<c0845ef4>] (hack_spinAB+0x24/0x3c)
[ 94.097666] -(2)[1906:sh][<c0845ed0>] (hack_spinAB) from [<c0845f30>] (hello_test_store+0x24/0x44)
[ 94.098785] -(2)[1906:sh][<c0845f0c>] (hello_test_store) from [<c04c56cc>] (dev_attr_store+0x20/0x2c)
再去看CPU的喂狗信息:kick=0x0000000b,check=0x0000000f,
可以發現是因為CPU2死鎖導致沒有及時喂狗,所以觸發HWT重啟,看CPU2的堆棧也是掛載sh的進程上:
cpu#2: Online
.nr_running : 5
.load
runnable tasks:
task PID tree-key switches prio wait-time sum-exec sum-sleep
---------------------------------------------------
DispSync 470 0.000000 1054 97 0.000000 157.709688 0.000000 /
kworker/2:2 1083 56925.416956 264 120 119.050538 21.742080 35541.090392 /
R sh 1906 84650.355638 115 120 14.202615 28671.890146 21283.409357 /
打開今日頭條,查看更多精彩圖片※線上單台Eureka升級到3台Eureka高可用
※理解foreach, for in, for of 之間的異同
TAG:程序員小新人學習 |