當前位置:
首頁 > 知識 > 內存管理+ linux內核並發與同步機制

內存管理+ linux內核並發與同步機制

一、臨界資源:

臨界區是指訪問或操作共享資源的代碼段,這些資源無法同時被多個執行線程訪問,為了避免臨界區的並發

訪問,需要保證臨界區的原子性,臨界區不能有多個並發源同時執行,原子性保護的是資源和數據,包括靜態局部

變數、全局變數、共享的數據結構、Buffer緩存等各種資源數據,產生並發訪問的並發源主要有如下:

中斷和異常:中斷髮生後,中斷執行程序和被中斷的進程之間可能產生並發訪問;

軟中斷和tasklet:軟中斷或者tasklet隨時可能會被調度執行,從而打斷當前正在執行的進程上下文;

內核搶佔:進程調度器支持搶佔特性,會導致進程和進程之間的並發訪問;

多處理器並發執行:多個處理器的上可以同時執行多個相同或者不同的進程。

二、同步機制:

上面已經介紹了臨界資源的相關內容,linux內核中的同步機制就是為了保證臨界資源不產生並發訪問的處理

方法,當然內核中針對不同的場景有不同的同步機制,如:原子操作、自旋鎖、信號量、Mutex互斥體、讀寫鎖、

RCU鎖等機制一面會一一介紹,並比較各種機制之間的差別。

1、原子操作:

(1)原子變數操作:

原子操作是指保證指令以原子的方式執行,執行的過程中不會被打斷。linux內核提供了atomic_t類型的原子

變數,變數的定義如下:

typedef struct {

int counter;

} atomic_t;

原子變數的常見的操作介面和用法如下:

#define ATOMIC_INIT(i) { (i) } //定義一個原子變數並初始化為i

#define atomic_read(v) READ_ONCE((v)->counter) //讀取原子變數的值

static inline void atomic_add(int i, atomic_t *v) //原子變數v增加i

static inline void atomic_sub(int i, atomic_t *v) //原子變數v減i

static inline void atomic_inc(atomic_t *v) //原子變數值加1

static inline void atomic_dec(atomic_t *v) //原子變數值減1

......

atomic_t use_cnt;

atomic_set(&use_cnt, 2);

atomic_add(4, &use_cnt);

atomic_inc(use_cnt);

(2)原子位操作:

在編寫代碼時,有時會使用到對某一個寄存器或者變數設置某一位的操作,可以使用如下的介面:

unsigned long word = 0;

set_bit(0, &word); /*第0位被設置*/

set_bit(1, &word); /*第1位被設置*/

clear_bit(1, &word); /*第1位被清空*/

change_bit(0, &word); /*翻轉第0位*/

2、自旋鎖spin_lock:

(1)自旋鎖的特點如下:

a、忙等待、不允許睡眠、快速執行完成,可用於中斷服務程序中;

b、自旋鎖可以保證臨界區不受別的CPU和本CPU的搶佔進程打擾;

c、如果A執行單元首先獲得鎖,那麼當B進入同一個常式時將獲知自旋鎖已被持有,需等待A釋放後才能進入,

所以B只好原地打轉(自旋);

d、自旋鎖鎖定期間不能調用可能引起進程調度的函數,如:copy_from_user(),copy_to_user(), kmalloc(),msleep();

(2)自旋鎖的操作介面:

//定義於#include<linux/spinlock_types.h>

spinlock_t lock; //定義自旋鎖

spin_lock_init(&lock); //初始化自旋鎖

spin_lock(&lock); //如不能獲得鎖,原地打轉。

spin_trylock(&lock);//嘗試獲得,如能獲得鎖返回真,不能獲得返回假,不再原地打轉。

spin_unlock(&lock); //與spin_lock()和spin_trylock()配對使用。

spin_lock_irq()

spin_unlock_irq()

spin_lock_irqsave()

spin_unlock_irqrestore()

spin_lock_bh()

spin_unlock_bh()

(3)使用舉例:

spinlock_t lock; //定義自旋鎖 --全局變數

spin_lock_init(&lock); //初始化自旋鎖 --初始化函數中

spin_lock(&lock); // 獲取自旋鎖 --成對在操作前後使用

//臨界區......

spin_unlock(&lock) //釋放自旋鎖

3、信號量:

(1)信號量的特點:

a、睡眠等待、可以睡眠、可以執行耗時長的進程;

b、共享資源允許被多個不同的進程同時訪問;

c、信號量常用於暫時無法獲取的共享資源,如果獲取失敗則進程進入不可中斷的睡眠狀態,只能由釋放資源的進程來喚醒;

d、信號量不能在中斷服務程序中使用,因為中斷服務程序是不允許進程睡眠的;

(2)信號量的使用:

struct semaphore {

atomic_t count; //共享計數值

int sleepers; //等待當前信號量進入睡眠的進程個數

wait_queue_head_t wait; // wait是當前信號量的等待隊列

};

//count用於判斷是否可以獲取該信號量:

//count大於0說明可以獲取信號量;

//count小於等於0,不可以獲取信號量;

操作介面如下:

static inline void down(struct semaphore * sem)

//獲取信號量,獲取失敗則進入睡眠狀態

static inline void up(struct semaphore * sem)

//釋放信號量,並喚醒等待隊列中的第一個進程

int down_interruptible(struct semaphore * sem);

// down_interruptible能被信號打斷;

int down_trylock(struct semaphore * sem);

//該函數嘗試獲得信號量sem,如果能夠立刻獲得,它就獲得該信號量並返回0,否則,返回非0值。

如:

down(sem);

...臨界區...

up(sem);

4、mutex_lock互斥鎖:

互斥鎖主要用於實現內核中的互斥訪問功能。內核互斥鎖是在原子API之上實現的,但這對於內核用戶是不可見的。

對它的訪問必須遵循一些規則:同一時間只能有一個任務持有互斥鎖,而且只有這個任務可以對互斥鎖進行解鎖。互斥鎖

不能進行遞歸鎖定或解鎖。一個互斥鎖對象必須通過其API初始化,而不能使用memset或複製初始化。一個任務在持有互

斥鎖的時候是不能結束的。互斥鎖所使用的內存區域是不能被釋放的。使用中的互斥鎖是不能被重新初始化的。並且互斥

鎖不能用於中斷上下文。但是互斥鎖比當前的內核信號量選項更快,並且更加緊湊。

(1)互斥體禁止多個線程同時進入受保護的代碼「臨界區」(critical section),因此,在任意時刻,只有一個線程被允許

進入這樣的代碼保護區。mutex實際上是count=1情況下的semaphore。

struct mutex {

atomic_t count;

spinlock_t wait_lock;

struct list_head wait_list;

struct task_struct *owner;

......

};

結構體成員說明:

atomic_t count;指示互斥鎖的狀態:

1 沒有上鎖,可以獲得;0 被鎖定,不能獲得,初始化為沒有上鎖;

spinlock_t wait_lock;

等待獲取互斥鎖中使用的自旋鎖,在獲取互斥鎖的過程中,操作會在自旋鎖的保護中進行,

初始化為為鎖定。

struct list_head wait_list;

等待互斥鎖的進程隊列。

(2)mutex的使用:

a、初始化

mutex_init(&mutex); //動態初始化互斥鎖

DEFINE_MUTEX(mutexname); //靜態定義和初始化互斥鎖

b、上鎖

void mutex_lock(struct mutex *lock);

//無法獲得鎖時,睡眠等待,不會被信號中斷。

int mutex_trylock(struct mutex *lock);

//此函數是 mutex_lock()的非阻塞版本,成功返回1,失敗返回0

int mutex_lock_interruptible(struct mutex *lock);

//和mutex_lock()一樣,也是獲取互斥鎖。在獲得了互斥鎖或進入睡眠直

//到獲得互斥鎖之後會返回0。如果在等待獲取鎖的時候進入睡眠狀態收到一

//個信號(被信號打斷睡眠),則返回_EINIR。

c、解鎖

void mutex_unlock(struct mutex *lock);

5、讀寫鎖:

讀寫鎖實際是一種特殊的自旋鎖,它把對共享資源的訪問者劃分成讀者和寫者,讀者只對共享資源進行讀訪問,

寫者則需要對共享資源進行寫操作。這種鎖相對於自旋鎖而言,能提高並發性,因為在多處理器系統中,它允許同

時有多個讀者來訪問共享資源,最大可能的讀者數為實際的邏輯CPU數。寫者是排他性的,一個讀寫鎖同時只能有

一個寫者或多個讀者(與CPU數相關),但不能同時既有讀者又有寫者。

如果讀寫鎖當前沒有讀者,也沒有寫者,那麼寫者可以立刻獲得讀寫鎖,否則它必須自旋在那裡,直到沒有任

何寫者或讀者。如果讀寫鎖沒有寫者,那麼讀者可以立即獲得該讀寫鎖,否則讀者必須自旋在那裡,直到寫者釋放

該讀寫鎖。

(1)操作函數介面:

rwlock_init(x)

DEFINE_RWLOCK(x)

read_trylock(lock)

write_trylock(lock)

read_lock(lock)

write_lock(lock)

read_unlock(lock)

write_unlock(lock)

(2)總結:

A、讀寫鎖本質上就是一個計數器,初始化值為0x01000000,表示最多可以有0x01000000個讀者同時獲取讀鎖;

B、獲取讀鎖時,rw計數減1,判斷結果的符號位是否為1,若結果符號位為0時,獲取讀鎖成功;

C、獲取讀鎖時,rw計數減1,判斷結果的符號位是否為1。若結果符號位為1時,獲取讀鎖失敗,表示此時讀寫鎖被寫者

佔有,此時調用__read_lock_failed失敗處理函數,循環測試rw+1的值,直到結果的值大於等於1;

D、獲取寫鎖時,rw計數減RW_LOCK_BIAS_STR,即rw-0x01000000,判斷結果是否為0。若結果為0時,獲取寫鎖成功;

E、獲取寫鎖時,rw計數減RW_LOCK_BIAS_STR,即rw-0x01000000,判斷結果是否為0。若結果不為0時,獲取寫鎖失敗,

表示此時有讀者佔有讀寫鎖或有寫著佔有讀寫鎖,此時調用__write_lock_failed失敗處理函數,循環測試rw+0x01000000,

直到結果的值等於0x01000000;

F、通過對讀寫鎖的源代碼分析,可以看出讀寫鎖其實是帶計數的特殊自旋鎖,能同時被多個讀者佔有或一個寫者佔有,

但不能同時被讀者和寫者佔有。

6、讀寫信號量:

(1)特點:

a、同一時刻最多有一個寫者(writer)獲得鎖;

b、同一時刻可以有多個讀者(reader)獲得鎖;

c、同一時刻寫者和讀者不能同時獲得鎖;

(2)相關結構和函數介面:

struct rw_semaphore {

/*讀/寫信號量定義:

* - 如果activity為0,那麼沒有激活的讀者或寫者。

* - 如果activity為+ve,那麼將有ve個激活的讀者。

* - 如果activity為-1,那麼將有1個激活的寫者。 */

__s32 activity; /*信號量值*/

spinlock_t wait_lock; /*用於鎖等待隊列wait_list*/

struct list_head wait_list; /*如果非空,表示有進程等待該信號量*/

};

void init_rwsem(struct rw_semaphore* rw_sem); //初始化讀寫信號量

void down_read(struct rw_semaphore* rw_sem); //獲取讀信號量

int down_read_trylock(struct rw_semaphore* rw_sem); //嘗試獲取讀信號量

void up_read(struct rw_semaphore* rw_sem);

void down_write(struct rw_semaphore* rw_sem); //獲取寫信號量

int down_write_trylock(struct rw_semaphore* rw_sem);//嘗試獲取寫信號量

void up_write(struct rw_semaphore* rw_sem);

(3)使用方法:

rw_semaphore sem;

init_rwsem(&sem);

down_read(&sem);

...臨界區...

up_read(&sem);

down_write(&sem);

...臨界區...

up_write(&sem);

三、總結與拓展

1、針對上面的各種同步機制直接的差異,可以如下表格清晰的表明:

2、在解決穩定性相關的問題時,難免會出現一些死鎖的問題,可以添加一些debug宏來配合調試:

CONFIG_LOCK_STAT=y

CONFIG_DEBUG_LOCKDEP=y

CONFIG_PROVE_LOCKING=y

CONFIG_DEBUG_MUTEXES=y

CONFIG_DEBUG_LOCK_ALLOC=y

CONFIG_RWSEM_SPIN_ON_OWNER=y

如下有一個簡單的死鎖的常式:

static DEFINE_SPINLOCK(hack_spinA);

static DEFINE_SPINLOCK(hack_spinB);

void hack_spinBA(void)

{

printk("%s(),hack A and B
",__FUNCTION__);

spin_lock(&hack_spinA);

spin_lock(&hack_spinB);

}

void hack_spinAB(void)

{

printk("%s(),hack A
",__FUNCTION__);

spin_lock(&hack_spinA);

spin_lock(&hack_spinB);

}

static ssize_t hello_test_show(struct device *dev, struct device_attribute *attr, char *buf)

{

printk("%s()
",__FUNCTION__);

hack_spinBA();

return 0;

}

static ssize_t hello_test_store(struct device *dev, struct device_attribute *attr, const char *buf, size_t count)

{

int test;

printk("hello_test %s,%d
",__FUNCTION__,__LINE__);

test = 4;

hack_spinAB();

return printk("%s() test = %d
",__FUNCTION__,test);

}

static DEVICE_ATTR(hello_test, 0664, hello_test_show, hello_test_store);

static int hello_test_probe(struct platform_device *pdev)

{

printk("%s()
",__FUNCTION__);

device_create_file(&pdev->dev, &dev_attr_hello_test);

return 0;

}

上面的例子中,先cat對應的節點會先後拿住hack_spinA和hack_spinB的兩把鎖,但並沒有去釋放這兩把鎖,

所以再對應的相應節點做echo操作時,會出現一直無法拿住這把鎖,等待30S後會觸發HWT的重啟,看重啟的DB

文件可以發現:

/***********在58S時通過cat 執行到了hello_test_show函數,從而拿住了鎖********/

[ 58.453863] (1)[2577:cat]Dump cpuinfo

[ 58.456057] (1)[2577:cat]hello_test_show()

[ 58.456093] (1)[2577:cat]hack_spinBA(),hack A and B

[ 58.457426] (1)[2577:cat]note: cat[2577] exited with preempt_count 2

[ 58.458884] (1)[2577:cat]

[ 58.458920] (1)[2577:cat]=====================================

[ 58.458931] (1)[2577:cat][ BUG: cat/2577 still has locks held! ]

[ 58.458944] (1)[2577:cat]4.4.146+ #5 Tainted: G W O

[ 58.458955] (1)[2577:cat]-------------------------------------

[ 58.458965] (1)[2577:cat]lockdep: [Caution!] cat/2577 is runable state

[ 58.458976] (1)[2577:cat]lockdep: 2 locks held by cat/2577:

[ 58.458988] #0: (hack_spinA){......}

[ 58.459015] lockdep: , at: (1)[2577:cat][<c0845e8c>] hack_spinBA+0x24/0x3c

[ 58.459049] #1: (hack_spinB){......}

[ 58.459074] lockdep: , at: (1)[2577:cat][<c0845e94>] hack_spinBA+0x2c/0x3c

[ 58.459098] (1)[2577:cat]

/**********在64S是通過echo指令調用到hello_test_store再次去拿鎖,導致死鎖******/

[ 64.083878] (2)[1906:sh]hello_test hello_test_store,41

[ 64.083914] (2)[1906:sh]hack_spinAB(),hack A

/**********在94S時出現 HWT 重啟***************/

[ 94.092084] -(2)[1906:sh][<c0836a40>] (aee_wdt_atf_info) from [<c0837428>] (aee_wdt_atf_entry+0x180/0x1b8)

[ 94.093571] -(2)[1906:sh][<c08372a8>] (aee_wdt_atf_entry) from [<c0152bfc>] (preempt_count_sub+0xe4/0x100)

[ 94.095055] -(2)[1906:sh][<c019694c>] (do_raw_spin_lock) from [<c0c6f5f8>] (_raw_spin_lock+0x48/0x50)

[ 94.096548] -(2)[1906:sh][<c0c6f5b0>] (_raw_spin_lock) from [<c0845ef4>] (hack_spinAB+0x24/0x3c)

[ 94.097666] -(2)[1906:sh][<c0845ed0>] (hack_spinAB) from [<c0845f30>] (hello_test_store+0x24/0x44)

[ 94.098785] -(2)[1906:sh][<c0845f0c>] (hello_test_store) from [<c04c56cc>] (dev_attr_store+0x20/0x2c)

再去看CPU的喂狗信息:kick=0x0000000b,check=0x0000000f,

可以發現是因為CPU2死鎖導致沒有及時喂狗,所以觸發HWT重啟,看CPU2的堆棧也是掛載sh的進程上:

cpu#2: Online

.nr_running : 5

.load

runnable tasks:

task PID tree-key switches prio wait-time sum-exec sum-sleep

---------------------------------------------------

DispSync 470 0.000000 1054 97 0.000000 157.709688 0.000000 /

kworker/2:2 1083 56925.416956 264 120 119.050538 21.742080 35541.090392 /

R sh 1906 84650.355638 115 120 14.202615 28671.890146 21283.409357 /

內存管理+ linux內核並發與同步機制

打開今日頭條,查看更多精彩圖片
喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

線上單台Eureka升級到3台Eureka高可用
理解foreach, for in, for of 之間的異同

TAG:程序員小新人學習 |