當前位置:
首頁 > 知識 > 緩存架構SpringBoot集成Curator實現zookeeper分散式鎖

緩存架構SpringBoot集成Curator實現zookeeper分散式鎖

這篇文章其實是下篇文章緩存架構之實戰演練基於zk分散式鎖解決分散式緩存並發衝突問題做理論鋪墊的,下篇文章我們就會利用該工具解決分散式緩存並發衝突問題,就是下面這個架構,下篇文章我們會重點討論:

緩存架構SpringBoot集成Curator實現zookeeper分散式鎖

一、分散式鎖簡介

1、什麼是鎖

  • 在單機環境下,當存在多個線程可以同時改變某個共享變數時,就需要同步來實現該功能,使其線程安全。
  • 而同步就是通過鎖來實現的。鎖保證了同一時刻只有一個線程來修改共享變數。

在單機環境下,Java提供了一些並發安全包可以一定程度上保證線程安全,但是在分散式環境(多機環境)下,這些並發包顯得就無能為力了!!

2、什麼是分散式

分散式的CAP理論:

任何一個分散式系統都無法同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多只能同時滿足兩項。

目前很多大型網站及應用都是分散式部署的,分散式場景中的數據一致性問題一直是一個比較重要的話題。基於 CAP理論,很多系統在設計之初就要對這三者做出取捨。在互聯網領域的絕大多數的場景中,都需要犧牲強一致性來換取系統的高可用性,系統往往只需要保證最終一致性。

3、什麼是分散式鎖

顧名思義,分散式鎖肯定是用在分散式環境下。在分散式環境下,使用分散式鎖的目的也是保證同一時刻只有一個線程來修改共享變數,修改共享緩存……。


下篇文章,我們將分享一個實戰案例,就是:緩存架構之實戰演練基於zk分散式鎖解決分散式緩存並發衝突問題

二、原生zookeeper實現分散式鎖

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
/**
* zookeeper工具類:
*
* 更多免費資料,更多高清內容,更多java技術,歡迎訪問網站
* 極客慧:www.jikeh.cn
* 如果你希望進一步深入交流,請加入我們的大家庭QQ群:375412858
*/
public class ZooKeeperSession {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperSession.class);
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zookeeper;
public ZooKeeperSession() {
// 連接zookeeper server,是非同步創建會話的,那我們怎麼知道zk session建立成功了呢?
// 通過一個監聽器+CountDownLatch,來確認真正建立了zk server的連接
try {
this.zookeeper = new ZooKeeper(
"localhost:2181",
50000,
new ZooKeeperWatcher());
//列印即使狀態:驗證其是不是非同步的?
logger.info(String.valueOf(zookeeper.getState()));
try {
// CountDownLatch:簡而言之 初始化——非0;非0——等待;0——往下執行
connectedSemaphore.await();
} catch(InterruptedException e) {
e.printStackTrace();
}
logger.info("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 初始化實例:
*/
public static void init() {
getInstance();
}
/**
* 建立zk session的watcher:
*/
private class ZooKeeperWatcher implements Watcher {
public void process(WatchedEvent event) {
if(KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}
/**
* 靜態內部類實現單例:
*/
private static class Singleton {
private static ZooKeeperSession instance;
static {
instance = new ZooKeeperSession();
}
public static ZooKeeperSession getInstance() {
return instance;
}
}
/**
* 獲取單例:
*
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}
/**
* 重試獲取分散式鎖:
*
* @param adId
*/
public void acquireDistributedLock(Long adId) {
String path = "/ad-lock-" + adId;
try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
logger.info("success to acquire lock for adId = " + adId);
} catch (Exception e) {
// 如果那個廣告對應的鎖node,已經存在了,就是已經被別人加鎖了,那麼就這裡就會報錯
// NodeExistsException
int count = 0;
while(true) {
try {
Thread.sleep(1000);
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
count++;
logger.info("the " + count + " times try to acquire lock for adId = " + adId);
continue;
}
logger.info("success to acquire lock for adId = " + adId + " after " + count + " times try......");
break;
}
}
}
/**
* 釋放掉分散式鎖:
*
* @param adId
*/
public void releaseDistributedLock(Long adId) {
String path = "/ad-lock-" + adId;
try {
zookeeper.delete(path, -1);
logger.info("release the lock for adId = " + adId);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Long adId = 1L;
ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
//1、獲取鎖:
zkSession.acquireDistributedLock(adId);
//2、執行一些修改共享資源的操作
logger.info("I am updating common resource!");
//3、釋放鎖
zkSession.releaseDistributedLock(adId);
}
}

三、SpringBoot集成Curator實現zookeeper分散式鎖

1、Curator簡介

Apache Curator是Netflix公司開源的一個Zookeeper客戶端,目前已經是Apache的頂級項目,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量,通過封裝的一套高級API,裡面提供了更多豐富的操作,例如session超時重連、主從選舉、分散式計數器、分散式鎖等等適用於各種複雜場景的zookeeper操作。

2、SpringBoot集成Curator實現zk分散式鎖

1)引入pom依賴

<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

2)基本配置

#重試次數
curator.retryCount=5
#重試間隔時間
curator.elapsedTimeMs=5000
# zookeeper 地址
curator.connectString=127.0.0.1:2181
# session超時時間
curator.sessionTimeoutMs=60000
# 連接超時時間
curator.connectionTimeoutMs=5000

3)連接配置

package com.jikeh.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CuratorConfiguration {
@Value("${curator.retryCount}")
private int retryCount;
@Value("${curator.elapsedTimeMs}")
private int elapsedTimeMs;
@Value("${curator.connectString}")
private String connectString;
@Value("${curator.sessionTimeoutMs}")
private int sessionTimeoutMs;
@Value("${curator.connectionTimeoutMs}")
private int connectionTimeoutMs;
@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(
connectString,
sessionTimeoutMs,
connectionTimeoutMs,
new RetryNTimes(retryCount, elapsedTimeMs));
}
}

4)Curator實現zk分散式鎖工具類

package com.jikeh.lock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
@Service
public class DistributedLockByCurator implements InitializingBean{
private static final Logger logger = LoggerFactory.getLogger(DistributedLockByCurator.class);
private final static String ROOT_PATH_LOCK = "rootlock";
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Autowired
private CuratorFramework curatorFramework;
/**
* 獲取分散式鎖
*/
public void acquireDistributedLock(String path) {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
while (true) {
try {
curatorFramework
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(keyPath);
logger.info("success to acquire lock for path:{}", keyPath);
break;
} catch (Exception e) {
logger.info("failed to acquire lock for path:{}", keyPath);
logger.info("while try again .......");
try {
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
countDownLatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
/**
* 釋放分散式鎖
*/
public boolean releaseDistributedLock(String path) {
try {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
}
} catch (Exception e) {
logger.error("failed to release lock");
return false;
}
return true;
}
/**
* 創建 watcher 事件
*/
private void addWatcher(String path) throws Exception {
String keyPath;
if (path.equals(ROOT_PATH_LOCK)) {
keyPath = "/" + path;
} else {
keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
}
final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String oldPath = event.getData().getPath();
logger.info("success to release lock for path:{}", oldPath);
if (oldPath.contains(path)) {
//釋放計數器,讓當前的請求獲取鎖
countDownLatch.countDown();
}
}
});
}
//創建父節點,並創建永久節點
@Override
public void afterPropertiesSet() {
curatorFramework = curatorFramework.usingNamespace("lock-namespace");
String path = "/" + ROOT_PATH_LOCK;
try {
if (curatorFramework.checkExists().forPath(path) == null) {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
addWatcher(ROOT_PATH_LOCK);
logger.info("root path 的 watcher 事件創建成功");
} catch (Exception e) {
logger.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
}
}
}

5)測試控制器

  • 首先訪問鏈接(線程1):http://localhost:1111/curator/lock1 首先拿到鎖,鎖保持20s,操作,放鎖
  • 再訪問鏈接(線程2):http://localhost:1111/curator/lock2 等待獲取鎖,鎖保持15s,操作,放鎖
  • 結果分析

緩存架構SpringBoot集成Curator實現zookeeper分散式鎖

注釋:紅色——線程1執行結果;藍色——線程2執行結果;

代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git 項目名:spring-boot-curator

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

演算法:分析基礎
spring源碼分析——spring大綱

TAG:程序員小新人學習 |