一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

終極鎖實戰(zhàn):單JVM鎖+分布式鎖

 Baruch 2017-10-03

目錄

1.前言

2.單JVM鎖

3.分布式鎖

4.總結

=========正文分割線=================

1.前言

鎖就像一把鑰匙,需要加鎖的代碼就像一個房間。出現互斥操作的典型場景:多人同時想進同一個房間爭搶這個房間的鑰匙(只有一把),一人搶到鑰匙,其他人都等待這個人出來歸還鑰匙,此時大家再次爭搶鑰匙循環(huán)下去。

作為終極實戰(zhàn)系列,本篇用java語言分析鎖的原理(源碼剖析)和應用(詳細代碼),根據鎖的作用范圍分為:JVM鎖和分布式鎖。如理解有誤之處,還請指出。

2.單JVM鎖(進程級別)

程序部署在一臺服務器上,當容器啟動時(例如tomcat),一臺JVM就運行起來了。本節(jié)分析的鎖均只能在單JVM下生效。因為最終鎖定的是某個對象,這個對象生存在JVM中,自然鎖只能鎖單JVM。這一點很重要。如果你的服務只部署一個實例,那么恭喜你,用以下幾種鎖就可以了。

1.synchronized同步鎖

2.ReentrantLock重入鎖

3.ReadWriteLock讀寫鎖

4.StampedLock戳鎖

由于之前已經詳細分析過原理+使用,各位直接坐飛機吧:同步中的四種鎖synchronized、ReentrantLock、ReadWriteLock、StampedLock

3.分布式鎖(多服務節(jié)點,多進程)

3.1基于數據庫鎖實現

場景舉例:

賣商品,先查詢庫存>0,更新庫存-1。

 1.悲觀鎖:select for update(一致性鎖定讀)


查詢官方文檔如上圖,事務內起作用的行鎖。能夠保證當前session事務所鎖定的行不會被其他session所修改(這里的修改指更新或者刪除)。對讀取的記錄加X鎖,即排它鎖,其他事不能對上鎖的行加任何鎖。

BEGIN;(確保以下2步驟在一個事務中:)
SELECT * FROM tb_product_stock WHERE product_id=1 FOR UPDATE--->product_id有索引,鎖行.加鎖(注:條件字段必須有索引才能鎖行,否則鎖表,且最好用explain查看一下是否使用了索引,因為有一些會被優(yōu)化掉最終沒有使用索引
UPDATE tb_product_stock SET number=number-1 WHERE product_id=1--->更新庫存-1.解鎖
COMMIT;

 2.樂觀鎖:版本控制,選一個字段作為版本控制字段,更新前查詢一次,更新時該字段作為更新條件。不同業(yè)務場景,版本控制字段,可以0 1控制,也可以+1控制,也可以-1控制,這個隨意。

BEGIN;(確保以下2步驟在一個事務中:)
SELECT number FROM tb_product_stock WHERE product_id=1--》查詢庫存總數,不加鎖
UPDATE tb_product_stock SET number=number-1 WHERE product_id=1 AND number=第一步查詢到的庫存數--》number字段作為版本控制字段
COMMIT; 

3.2基于緩存實現(redis,memcached)

原理:

redisson開源jar包,提供了很多功能,其中就包含分布式鎖。是Redis官方推薦的頂級項目,官網飛機票

核心org.redisson.api.RLock接口封裝了分布式鎖的獲取和釋放。源碼如下:

復制代碼
 1 @Override
 2     public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
 3         long time = unit.toMillis(waitTime);
 4         long current = System.currentTimeMillis();
 5         final long threadId = Thread.currentThread().getId();
 6         Long ttl = tryAcquire(leaseTime, unit, threadId);//申請鎖,返回還剩余的鎖過期時間
 7         // lock acquired
 8         if (ttl == null) {
 9             return true;
10         }
11         
12         time -= (System.currentTimeMillis() - current);
13         if (time <= 0) {
14             acquireFailed(threadId);
15             return false;
16         }
17         
18         current = System.currentTimeMillis();
19         final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
20         if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
21             if (!subscribeFuture.cancel(false)) {
22                 subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
23                     @Override
24                     public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
25                         if (subscribeFuture.isSuccess()) {
26                             unsubscribe(subscribeFuture, threadId);
27                         }
28                     }
29                 });
30             }
31             acquireFailed(threadId);
32             return false;
33         }
34 
35         try {
36             time -= (System.currentTimeMillis() - current);
37             if (time <= 0) {
38                 acquireFailed(threadId);
39                 return false;
40             }
41         
42             while (true) {
43                 long currentTime = System.currentTimeMillis();
44                 ttl = tryAcquire(leaseTime, unit, threadId);
45                 // lock acquired
46                 if (ttl == null) {
47                     return true;
48                 }
49 
50                 time -= (System.currentTimeMillis() - currentTime);
51                 if (time <= 0) {
52                     acquireFailed(threadId);
53                     return false;
54                 }
55 
56                 // waiting for message
57                 currentTime = System.currentTimeMillis();
58                 if (ttl >= 0 && ttl < time) {
59                     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
60                 } else {
61                     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
62                 }
63 
64                 time -= (System.currentTimeMillis() - currentTime);
65                 if (time <= 0) {
66                     acquireFailed(threadId);
67                     return false;
68                 }
69             }
70         } finally {
71             unsubscribe(subscribeFuture, threadId);
72         }
73 //        return get(tryLockAsync(waitTime, leaseTime, unit));
74     }
復制代碼

 

上述方法,調用加鎖的邏輯就是在tryAcquire(leaseTime, unit, threadId)中,如下圖:

1 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
2     return get(tryAcquireAsync(leaseTime, unit, threadId));//tryAcquireAsync返回RFutrue
3 }
tryAcquireAsync中commandExecutor.evalWriteAsync就是咱們加鎖核心方法了
復制代碼
 1 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
 2         internalLockLeaseTime = unit.toMillis(leaseTime);
 3 
 4         return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
 5                   "if (redis.call('exists', KEYS[1]) == 0) then " +
 6                       "redis.call('hset', KEYS[1], ARGV[2], 1); " +
 7                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
 8                       "return nil; " +
 9                   "end; " +
10                   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
11                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
12                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
13                       "return nil; " +
14                   "end; " +
15                   "return redis.call('pttl', KEYS[1]);",
16                     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
17     }
復制代碼

如上圖,已經到了redis命令了

加鎖:

  • KEYS[1] :需要加鎖的key,這里需要是字符串類型。
  • ARGV[1] :鎖的超時時間,防止死鎖
  • ARGV[2] :鎖的唯一標識,(UUID.randomUUID()) + “:” + threadId
復制代碼
 1 // 檢查是否key已經被占用,如果沒有則設置超時時間和唯一標識,初始化value=1
 2 if (redis.call('exists', KEYS[1]) == 0) 
 3 then  
 4 redis.call('hset', KEYS[1], ARGV[2], 1); //hset key field value 哈希數據結構
 5 redis.call('pexpire', KEYS[1], ARGV[1]); //pexpire key expireTime 設置有效時間 
 6 return nil; 
 7 end; 
 8 // 如果鎖重入,需要判斷鎖的key field 都一直情況下 value 加一
 9 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
10 then 
11 redis.call('hincrby', KEYS[1], ARGV[2], 1);//hincrby key filed addValue 加1
12 redis.call('pexpire', KEYS[1], ARGV[1]);//pexpire key expireTime重新設置超時時間
13 return nil; 
14 end; 
15 // 返回剩余的過期時間
16 return redis.call('pttl', KEYS[1]);
復制代碼

以上的方法,當返回空是,說明獲取到鎖,如果返回一個long數值(pttl 命令的返回值),說明鎖已被占用,通過返回剩余時間,外部可以做一些等待時間的判斷和調整。

不再分析解鎖步驟,直接貼上解鎖的redis 命令

解鎖:

– KEYS[1] :需要加鎖的key,這里需要是字符串類型。

– KEYS[2] :redis消息的ChannelName,一個分布式鎖對應唯一的一個channelName:“redisson_lock__channel__{” + getName() + “}”

– ARGV[1] :reids消息體,這里只需要一個字節(jié)的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請鎖。

– ARGV[2] :鎖的超時時間,防止死鎖

– ARGV[3] :鎖的唯一標識,(UUID.randomUUID()) + “:” + threadId

復制代碼
 1 // 如果key已經不存在,說明已經被解鎖,直接發(fā)布(publihs)redis消息
 2 if (redis.call('exists', KEYS[1]) == 0) 
 3 then
 4     redis.call('publish', KEYS[2], ARGV[1]);//publish ChannelName message向信道發(fā)送解鎖消息
 5     return 1;
 6 end;
 7 // key和field不匹配,說明當前客戶端線程沒有持有鎖,不能主動解鎖。
 8 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
 9 then 
10     return nil;
11 end; 
12 // 將value減1
13 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //hincrby key filed addValue 減1
14 // 如果counter>0說明鎖在重入,不能刪除key
15 if (counter > 0)  
16 then
17     redis.call('pexpire', KEYS[1], ARGV[2]);                             
18     return 0; 
19 else 
20     // 刪除key并且publish 解鎖消息
21     redis.call('del', KEYS[1]);                            
22     redis.call('publish', KEYS[2], ARGV[1]); 
23     return 1; 
24 end; 
25 return nil;
復制代碼

 特點:

邏輯并不復雜, 實現了可重入功能, 通過pub/sub功能來減少空轉,性能極高。

實現了Lock的大部分功能,支持強制解鎖

 

實戰(zhàn):

1.創(chuàng)建客戶端配置類:

這里我們最終只用了一種來測試,就是initSingleServerConfig單例模式。

復制代碼
  1 package distributed.lock.redis;
  2 
  3 import org.redisson.config.Config;
  4 
  5 /**
  6  * 
  7  * @ClassName:RedissionConfig
  8  * @Description:自定義RedissionConfig初始化方法
  9  * 支持自定義構造:單例模式,集群模式,主從模式,哨兵模式。
 10  * 注:此處使用spring bean 配置文件保證bean單例,見applicationContext-redis.xml
 11  * 大家也可以用工廠模式自己維護單例:本類生成RedissionConfig,再RedissonClient redisson = Redisson.create(config);這樣就可以創(chuàng)建RedissonClient
 12  * @author diandian.zhang
 13  * @date 2017年7月20日下午12:55:50
 14  */
 15 public class RedissionConfig {
 16     private RedissionConfig() {
 17     }
 18 
 19     public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword) {
 20         return initSingleServerConfig(redisHost, redisPort, redisPassword, 0);
 21     }
 22 
 23     /**
 24      * 
 25      * @Description 使用單例模式初始化構造Config
 26      * @param redisHost
 27      * @param redisPort
 28      * @param redisPassword
 29      * @param redisDatabase redis db 默認0 (0~15)有redis.conf配置文件中參數來控制數據庫總數:database 16.
 30      * @return
 31      * @author diandian.zhang
 32      * @date 2017年7月20日下午12:56:21
 33      * @since JDK1.8
 34      */
 35     public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword,Integer redisDatabase) {
 36         Config config = new Config();
 37         config.useSingleServer().setAddress(redisHost + ":" + redisPort)
 38         .setPassword(redisPassword)
 39         .setDatabase(redisDatabase);//可以不設置,看業(yè)務是否需要隔離
 40         //RedissonClient redisson = Redisson.create(config);
 41         return config;
 42     }
 43     
 44     /**
 45      * 
 46      * @Description 集群模式
 47      * @param masterAddress
 48      * @param nodeAddressArray
 49      * @return
 50      * @author diandian.zhang
 51      * @date 2017年7月20日下午3:29:32
 52      * @since JDK1.8
 53      */
 54     public static Config initClusterServerConfig(String masterAddress, String[] nodeAddressArray) {
 55             String nodeStr = "";
 56         for(String slave:nodeAddressArray){
 57             nodeStr +=","+slave;
 58         }
 59         Config config = new Config();
 60         config.useClusterServers()
 61             .setScanInterval(2000) // cluster state scan interval in milliseconds
 62             .addNodeAddress(nodeStr);
 63        return config;
 64    }
 65     
 66     /**
 67      * 
 68      * @Description 主從模式
 69      * @param masterAddress 一主
 70      * @param slaveAddressArray 多從
 71      * @return
 72      * @author diandian.zhang
 73      * @date 2017年7月20日下午2:29:38
 74      * @since JDK1.8
 75      */
 76     public static Config initMasterSlaveServerConfig(String masterAddress, String[] slaveAddressArray) {
 77          String slaveStr = "";
 78          for(String slave:slaveAddressArray){
 79              slaveStr +=","+slave;
 80          }
 81         Config config = new Config();
 82         config.useMasterSlaveServers()
 83         .setMasterAddress(masterAddress)//一主
 84         .addSlaveAddress(slaveStr);//多從"127.0.0.1:26389", "127.0.0.1:26379"
 85         return config;
 86     }
 87     
 88     /**
 89      * 
 90      * @Description 哨兵模式
 91      * @param masterAddress
 92      * @param slaveAddressArray
 93      * @return
 94      * @author diandian.zhang
 95      * @date 2017年7月20日下午3:01:35
 96      * @since JDK1.8
 97      */
 98     public static Config initSentinelServerConfig(String masterAddress, String[] sentinelAddressArray) {
 99         String sentinelStr = "";
100         for(String sentinel:sentinelAddressArray){
101             sentinelStr +=","+sentinel;
102         }
103         Config config = new Config();
104         config.useSentinelServers()
105         .setMasterName("mymaster")
106         .addSentinelAddress(sentinelStr);
107         return config;
108     }
109     
110     
111 }
復制代碼

 

2.分布式鎖實現類

復制代碼
  1 package distributed.lock.redis;
  2 
  3 
  4 
  5 import java.text.SimpleDateFormat;
  6 import java.util.Date;
  7 import java.util.concurrent.CountDownLatch;
  8 import java.util.concurrent.TimeUnit;
  9 
 10 import org.redisson.Redisson;
 11 import org.redisson.api.RLock;
 12 import org.redisson.api.RedissonClient;
 13 import org.slf4j.Logger;
 14 import org.slf4j.LoggerFactory;
 15 
 16 
 17 public class RedissonTest {
 18     private static final Logger logger = LoggerFactory.getLogger(RedissonTest.class);
 19     static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 20     //這里可自定義多種模式,單例,集群,主從,哨兵模式。為了簡單這里使用單例模式
 21     private static RedissonClient redissonClient = Redisson.create(RedissionConfig.initSingleServerConfig("192.168.50.107", "6379", "password"));
 22     
 23     public static void main(String[] args) {
 24         CountDownLatch latch = new CountDownLatch(3);
 25          // key
 26         String lockKey = "testkey20170802";
 27         try {
 28             Thread t1 = new Thread(() -> {
 29                 doWithLock(lockKey,latch);//函數式編程
 30             }, "t1");
 31             Thread t2 = new Thread(() -> {
 32                 doWithLock(lockKey,latch);
 33             }, "t2");
 34             Thread t3 = new Thread(() -> {
 35                 doWithLock(lockKey,latch);
 36             }, "t3");
 37             //啟動線程
 38             t1.start();
 39             t2.start();
 40             t3.start(); 
 41             //等待全部完成
 42             latch.await();
 43             System.out.println("3個線程都解鎖完畢,關閉客戶端!");
 44             redissonClient.shutdown();
 45         } catch (Exception e) {
 46             e.printStackTrace();
 47         }
 48     }
 49     
 50     /**
 51      * 
 52      * @Description 線程執(zhí)行函數體
 53      * @param lockKey
 54      * @author diandian.zhang
 55      * @date 2017年8月2日下午3:37:32
 56      * @since JDK1.8
 57      */
 58     private static void doWithLock(String lockKey,CountDownLatch latch) {
 59         try {
 60             System.out.println("進入線程="+Thread.currentThread().getName()+":"+time.format(new Date()));
 61             //獲取鎖,30秒內獲取到返回true,未獲取到返回false,60秒過后自動unLock
 62             if (tryLock(lockKey, 30, 60, TimeUnit.SECONDS)) {
 63                 System.out.println(Thread.currentThread().getName() + " 獲取鎖成功!,執(zhí)行需要加鎖的任務"+time.format(new Date()));
 64                 Thread.sleep(2000L);//休息2秒模擬執(zhí)行需要加鎖的任務
 65             //獲取鎖超時
 66             }else{
 67                 System.out.println(Thread.currentThread().getName() + " 獲取鎖超時!"+time.format(new Date()));
 68             }
 69         } catch (Exception e) {
 70             e.printStackTrace();
 71         } finally {
 72             try {
 73                 //釋放鎖
 74                 unLock(lockKey);
 75                 latch.countDown();//完成,計數器減一  
 76             } catch (Exception e) {
 77                 e.printStackTrace();
 78             }
 79         }
 80     }
 81     
 82     /**
 83      * 
 84      * @Description 獲取鎖,鎖waitTime時間內獲取到返回true,未獲取到返回false,租賃期leaseTime過后unLock(除非手動釋放鎖)
 85      * @param key
 86      * @param waitTime
 87      * @param leaseTime
 88      * @param timeUnit
 89      * @return
 90      * @author diandian.zhang
 91      * @date 2017年8月2日下午3:24:09
 92      * @since JDK1.8
 93      */
 94     public static boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit timeUnit) {
 95         try {
 96             //根據key獲取鎖實例,非公平鎖
 97             RLock lock = redissonClient.getLock(key);
 98             //在leaseTime時間內阻塞獲取鎖,獲取鎖后持有鎖直到leaseTime租期結束(除非手動unLock釋放鎖)。
 99             return lock.tryLock(waitTime, leaseTime, timeUnit);
100         } catch (Exception e) {
101             logger.error("redis獲取分布式鎖異常;key=" + key + ",waitTime=" + waitTime + ",leaseTime=" + leaseTime +
102                     ",timeUnit=" + timeUnit, e);
103             return false;
104         }
105     }
106     
107     /**
108      * 
109      * @Description 釋放鎖
110      * @param key
111      * @author diandian.zhang
112      * @date 2017年8月2日下午3:25:34
113      * @since JDK1.8
114      */
115     public static void unLock(String key) {
116         RLock lock = redissonClient.getLock(key);
117         lock.unlock();
118         System.out.println(Thread.currentThread().getName() + " 釋放鎖"+time.format(new Date()));
119     }
120 }
復制代碼

 

執(zhí)行結果如下:

復制代碼
 1 進入線程=t3:2017-08-02 16:33:19
 2 進入線程=t1:2017-08-02 16:33:19
 3 進入線程=t2:2017-08-02 16:33:19
 4 t2 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-08-02 16:33:19--->T2  19秒時獲取到鎖
 5 t2 釋放鎖2017-08-02 16:33:21--->T2任務完成,21秒時釋放鎖
 6 t1 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-08-02 16:33:21--->T1  21秒時獲取到鎖
 7 t1 釋放鎖2017-08-02 16:33:23--->T2任務完成,23秒時釋放鎖
 8 t3 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-08-02 16:33:23--->T3  23秒時獲取到鎖
 9 t3 釋放鎖2017-08-02 16:33:25--->T2任務完成,25秒時釋放鎖
10 3個線程都解鎖完畢,關閉客戶端!
復制代碼

如上圖,3個線程共消耗25-19=6秒,驗證通過,確實互斥鎖住了。

我們用Redis Desktop Manger來看一下redis中數據:

復制代碼
 1 192.168.50.107:0>hgetall "testkey20170802"--->用key查詢hash所有的值
 2 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:30--->T2獲取到鎖field=uuid:線程號
 3 2) 1                                      --->value=1代表重入次數為1
 4 192.168.50.107:0>hgetall "testkey20170802"--->T2釋放鎖,T1獲取到鎖
 5 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:29
 6 2) 1
 7 192.168.50.107:0>hgetall "testkey20170802"--->T1釋放鎖,T3獲取到鎖
 8 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:31
 9 2) 1
10 192.168.50.107:0>hgetall "testkey20170802"--->最后一次查詢時,T3釋放鎖,已無數據
復制代碼

 

2)基于zookeeper實現

原理:

每個客戶端(每個JVM內部共用一個客戶端實例)對某個方法加鎖時,在zookeeper上指定節(jié)點的目錄下,生成一個唯一的瞬時有序節(jié)點。判斷是否獲取鎖的方式很簡單,只需要判斷有序節(jié)點中序號最小的一個。當釋放鎖的時候,只需將這個瞬時節(jié)點刪除即可。

我們使用apache的Curator組件來實現,一般使用Client、Framework、Recipes三個組件。

curator下,InterProcessMutex可重入互斥公平鎖,源碼(curator-recipes-2.4.1.jar)注釋如下:

A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is "fair" - each user will get the mutex in the order requested (from ZK's point of view)

即一個在JVM上工作的可重入互斥鎖。使用ZK去持有這把鎖。在所有JVM中的進程組,只要使用相同的鎖路徑將會獲得進程間的臨界資源。進一步說,這個互斥鎖是公平的-因為每個線程將會根據請求順序獲得這個互斥量(對于ZK來說)

主要方法如下:

 

復制代碼
1     // 構造方法
2     public InterProcessMutex(CuratorFramework client, String path)
3     public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
4     // 通過acquire獲得鎖,并提供超時機制:
5     public void acquire() throws Exception
6     public boolean acquire(long time, TimeUnit unit) throws Exception
7     // 撤銷鎖
8     public void makeRevocable(RevocationListener<InterProcessMutex> listener)
9     public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
復制代碼

 

我們主要分析核心獲取鎖acquire方法如下:

 

復制代碼
 1 @Override
 2     public boolean acquire(long time, TimeUnit unit) throws Exception
 3     {
 4         return internalLock(time, unit);
 5     }
 6 
 7 private boolean internalLock(long time, TimeUnit unit) throws Exception
 8     {
 9         /*
10            Note on concurrency: a given lockData instance
11            can be only acted on by a single thread so locking isn't necessary
12         */
13 
14         Thread          currentThread = Thread.currentThread();
15         //線程安全map:private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();
16         LockData        lockData = threadData.get(currentThread);
17         if ( lockData != null )
18         {
19             //這里實現了可重入,如果當前線程已經獲取鎖,計數+1,直接返回true
20             lockData.lockCount.incrementAndGet();
21             return true;
22         }
23         //獲取鎖,核心方法
24         String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
25         if ( lockPath != null )
26         {   //得到鎖,塞進線程安全map
27             LockData        newLockData = new LockData(currentThread, lockPath);
28             threadData.put(currentThread, newLockData);
29             return true;
30         }
31 
32         return false;
33     }
復制代碼

 

核心獲取鎖的方法attemptLock源碼如下:

復制代碼
 1 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
 2     {
 3         final long      startMillis = System.currentTimeMillis();
 4         final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
 5         final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
 6         int             retryCount = 0;
 7 
 8         String          ourPath = null;
 9         boolean         hasTheLock = false;
10         boolean         isDone = false;
11         while ( !isDone )
12         {
13             isDone = true;
14 
15             try
16             {
17                 if ( localLockNodeBytes != null )
18                 {   
19                     ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
20                 }
21                 else
22                 {   //創(chuàng)建瞬時節(jié)點(客戶端斷開連接時刪除),節(jié)點名追加自增數字
23                     ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
24                 }
//自循環(huán)等待時間,并判斷是否獲取到鎖 25 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); 26 } 27 catch ( KeeperException.NoNodeException e ) 28 { 29 // gets thrown by StandardLockInternalsDriver when it can't find the lock node 30 // this can happen when the session expires, etc. So, if the retry allows, just try it all again 31 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) 32 { 33 isDone = false; 34 } 35 else 36 { 37 throw e; 38 } 39 } 40 } 41 //獲取到鎖返回節(jié)點path 42 if ( hasTheLock ) 43 { 44 return ourPath; 45 } 46 47 return null; 48 }
復制代碼
自循環(huán)等待時間:
復制代碼
 1  private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
 2     {
 3         boolean     haveTheLock = false;
 4         boolean     doDelete = false;
 5         try
 6         {
 7             if ( revocable.get() != null )
 8             {
 9                 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
10             }
11 
12             while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )//如果狀態(tài)是開始且未獲取到鎖
13             {
14                 List<String>        children = getSortedChildren();//獲取父節(jié)點下所有線程的子節(jié)點
15                 String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // 獲取當前節(jié)點名稱
16                 //核心方法:判斷是否獲取到鎖
17                 PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
18                 if ( predicateResults.getsTheLock() )//獲取到鎖,置true,下一次循環(huán)退出
19                 {
20                     haveTheLock = true;
21                 }
22                 else//沒有索取到鎖
23                 {
24                     String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();//這里路徑是上一次獲取到鎖的持有鎖路徑
25 
26                     synchronized(this)//強制加鎖
27                     {
                 //讓線程等待,并且watcher當前節(jié)點,當節(jié)點有變化的之后,則notifyAll當前等待的線程,讓它再次進入來爭搶鎖 28 Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath); 29 if ( stat != null ) 30 { 31 if ( millisToWait != null ) 32 { 33 millisToWait -= (System.currentTimeMillis() - startMillis); 34 startMillis = System.currentTimeMillis(); 35 if ( millisToWait <= 0 ) 36 { 37 doDelete = true; //等待超時,置狀態(tài)為true,后面會刪除節(jié)點 38 break; 39 } 40 //等待指定時間 41 wait(millisToWait); 42 } 43 else 44 { //一直等待 45 wait(); 46 } 47 } 48 } 49 // else it may have been deleted (i.e. lock released). Try to acquire again 50 } 51 } 52 } 53 catch ( Exception e ) 54 { 55 doDelete = true; 56 throw e; 57 } 58 finally 59 { 60 if ( doDelete )//刪除path 61 { 62 deleteOurPath(ourPath); 63 } 64 } 65 return haveTheLock; 66 }
復制代碼
復制代碼
 1 @Override           
 2     public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
 3     {
 4         int             ourIndex = children.indexOf(sequenceNodeName);//先根據子節(jié)點名獲取children(所有子節(jié)點升序集合)中的索引
 5         validateOurIndex(sequenceNodeName, ourIndex);//校驗如果索引為負值,即不存在該子節(jié)點
 6         //maxLeases允許同時租賃的數量,這里源代碼寫死了1,但這種設計符合將來拓展,修改maxLeases即可滿足多租賃
 7         boolean         getsTheLock = ourIndex < maxLeases;//maxLeases=1,所以只有當index=0時才是true,即所有子節(jié)點中升序排序第一個最小值,即第一個請求過來的,這就是核心思想所在!
 8         String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);//獲取到鎖返回null,否則未獲取到鎖,獲取上一次的獲取到鎖的路徑。后面會監(jiān)視這個路徑用以喚醒請求線程
 9        
10         return new PredicateResults(pathToWatch, getsTheLock);
11     }
復制代碼

 

特點:

1.可避免死鎖:zk瞬時節(jié)點(Ephemeral Nodes)生命周期和session一致,session結束,節(jié)點自動刪除。
2.依賴zk創(chuàng)建節(jié)點,涉及文件操作,開銷較大。

實戰(zhàn):

1.創(chuàng)建客戶端client
2.生成互斥鎖InterProcessMutex
3.開啟3個線程去獲取鎖

復制代碼
 1 package distributed.lock.zk;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.concurrent.TimeUnit;
 6 
 7 import org.apache.curator.framework.CuratorFramework;
 8 import org.apache.curator.framework.CuratorFrameworkFactory;
 9 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
10 import org.apache.curator.retry.ExponentialBackoffRetry;
11 import org.apache.curator.retry.RetryNTimes;
12 import org.jboss.netty.channel.StaticChannelPipeline;
13 import org.omg.CORBA.PRIVATE_MEMBER;
14 
15 /**
16  * 
17  * @ClassName:CuratorDistrLockTest
18  * @Description:Curator包實現zk分布式鎖:利用了zookeeper的臨時順序節(jié)點特性,一旦客戶端失去連接后,則就會自動清除該節(jié)點。
19  * @author diandian.zhang
20  * @date 2017年7月11日下午12:43:44
21  */
22 
23 public class CuratorDistrLock {
24     private static final String ZK_ADDRESS = "192.168.50.253:2181";//zk
25     private static final String ZK_LOCK_PATH = "/zktest";//path
26     static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
27     
28     public static void main(String[] args) {
29         try {
30             //創(chuàng)建zk客戶端
31 //            CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(3, 1000));
32             CuratorFramework client = CuratorFrameworkFactory.builder()
33                     .connectString(ZK_ADDRESS)
34                     .sessionTimeoutMs(5000)
35                     .retryPolicy(new ExponentialBackoffRetry(1000, 10))
36                     .build();
37             //開啟
38             client.start();
39             System.out.println("zk client start successfully!"+time.format(new Date()));
40             
41             Thread t1 = new Thread(() -> {
42                 doWithLock(client);//函數式編程
43             }, "t1");
44             Thread t2 = new Thread(() -> {
45                 doWithLock(client);
46             }, "t2");
47             Thread t3 = new Thread(() -> {
48                 doWithLock(client);
49             }, "t3");
50             //啟動線程
51             t1.start();
52             t2.start();
53             t3.start(); 
54         } catch (Exception e) {
55             e.printStackTrace();
56         }
57     }
58 
59     /**
60      * 
61      * @Description 線程執(zhí)行函數體
62      * @param client
63      * @param lock
64      * @author diandian.zhang
65      * @date 2017年7月12日下午6:00:53
66      * @since JDK1.8
67      */
68     private static void doWithLock(CuratorFramework client) {
69         //依賴ZK生成的可重入互斥公平鎖(按照請求順序)
70         InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
71         try {
72             System.out.println("進入線程="+Thread.currentThread().getName()+":"+time.format(new Date()));
73             
74             //花20秒時間嘗試獲取鎖
75             if (lock.acquire(20, TimeUnit.SECONDS)) {
76                 System.out.println(Thread.currentThread().getName() + " 獲取鎖成功!,執(zhí)行需要加鎖的任務"+time.format(new Date()));
77                 Thread.sleep(2000L);//休息2秒模擬執(zhí)行需要加鎖的任務
78             //獲取鎖超時
79             }else{
80                 System.out.println(Thread.currentThread().getName() + " 獲取鎖超時!"+time.format(new Date()));
81             }
82         } catch (Exception e) {
83             e.printStackTrace();
84         } finally {
85             try {
86                 //當前線程獲取到鎖,那么最后需要釋放鎖(實際上是刪除節(jié)點)
87                 if (lock.isAcquiredInThisProcess()) {
88                     lock.release();
89                     System.out.println(Thread.currentThread().getName() + " 釋放鎖"+time.format(new Date()));
90                 }
91             } catch (Exception e) {
92                 e.printStackTrace();
93             }
94         }
95     }
96 
97 }
復制代碼

 

執(zhí)行結果:

復制代碼
zk client start successfully!
進入線程=t2:2017-07-13 11:13:23
進入線程=t1:2017-07-13 11:13:23
進入線程=t3:2017-07-13 11:13:23
t2 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-07-13 11:13:23----》起始時間23秒
t2 釋放鎖2017-07-13 11:13:25
t3 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-07-13 11:13:25----》驗證耗時2秒,T2執(zhí)行完,T3執(zhí)行
t3 釋放鎖2017-07-13 11:13:27
t1 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-07-13 11:13:27----》驗證耗時2秒,T3執(zhí)行完,T1執(zhí)行
t1 釋放鎖2017-07-13 11:13:29----》驗證耗時2秒,T1執(zhí)行完,3個任務共耗時=29-23=6秒,驗證互斥鎖達到目標。
復制代碼

查看zookeeper節(jié)點

1.客戶端連接

zkCli.sh -server 192.168.50.253:2181

2.查看節(jié)點

[zk: 192.168.50.253:2181(CONNECTED) 80] ls /-----》查看根目錄
[dubbo, zktest, zookeeper, test]

[zk: 192.168.50.253:2181(CONNECTED) 81] ls /zktest -----》查看我們創(chuàng)建的子節(jié)點
[_c_034e5f23-abaf-4d4a-856f-c27956db574e-lock-0000000007, _c_63c708f1-2c3c-4e59-9d5b-f0c70c149758-lock-0000000006, _c_1f688cb7-c38c-4ebb-8909-0ba421e484a4-lock-0000000008]

[zk: 192.168.50.253:2181(CONNECTED) 82] ls /zktest-----》任務執(zhí)行完畢最終釋放了子節(jié)點
[]

4.總結比較

一級鎖分類

二級鎖分類

鎖名稱

特性

是否推薦

單JVM鎖

基于JVM源生synchronized關鍵字實現

synchronized同步鎖

 適用于低并發(fā)的情況,性能穩(wěn)定。 新手推薦
基于JDK實現,需顯示獲取鎖,釋放鎖

ReentrantLock可重入鎖

 適用于低、高并發(fā)的情況,性能較高  需要指定公平、非公平或condition時使用。

ReentrantReadWriteLock

可重入讀寫鎖

 適用于讀多寫少的情況。性能高。  老司機推薦

StampedLock戳鎖

 JDK8才有,適用于高并發(fā)且讀遠大于寫時,支持樂觀讀,票據校驗失敗后可升級悲觀讀鎖,性能極高!  老司機推薦

分布式鎖

基于數據庫鎖實現

悲觀鎖:select for update

 sql直接使用,但水很深。涉及數據庫ACID原理+隔離級別+不同數據庫規(guī)范
 不推薦

樂觀鎖:版本控制

 自己實現字段版本控制  新手推薦

基于緩存實現

org.redisson

 性能極高,支持除了分布式鎖外還實現了分布式對象、分布式集合等極端強大的功能  老司機推薦

基于zookeeper實現

org.apache.curator zookeeper

 性能較高,除支持分布式鎖外,還實現了master選舉、節(jié)點監(jiān)聽()、分布式隊列、Barrier、AtomicLong等計數器  老司機推薦

 

=====附Redis命令=======

  1. SETNX key value (SET if Not eXists):當且僅當 key 不存在,將 key 的值設為 value ,并返回1;若給定的 key 已經存在,則 SETNX 不做任何動作,并返回0。詳見:SETNX commond
  2. GETSET key value:將給定 key 的值設為 value ,并返回 key 的舊值 (old value),當 key 存在但不是字符串類型時,返回一個錯誤,當key不存在時,返回nil。詳見:GETSET commond
  3. GET key:返回 key 所關聯(lián)的字符串值,如果 key 不存在那么返回 nil 。詳見:GET Commond
  4. DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(integer)。詳見:DEL Commond
  5. HSET key field value:給一個key 設置一個{field=value}的組合值,如果key沒有就直接賦值并返回1,如果field已有,那么就更新value的值,并返回0.詳見:HSET Commond
  6. HEXISTS key field:當key 中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。詳見HEXISTS Commond
  7. HINCRBY key field increment:將存儲在 key 中的哈希(Hash)對象中的指定字段 field 的值加上增量 increment。如果鍵 key 不存在,一個保存了哈希對象的新建將被創(chuàng)建。如果字段 field 不存在,在進行當前操作前,其將被創(chuàng)建,且對應的值被置為 0。返回值是增量之后的值。詳見:HINCRBY Commond
  8. PEXPIRE key milliseconds:設置存活時間,單位是毫秒。expire操作單位是秒。詳見:PEXPIRE Commond
  9. PUBLISH channel message:向channel post一個message內容的消息,返回接收消息的客戶端數。詳見PUBLISH Commond

======參考======

分布式鎖的幾種實現方式~

 基于Redis實現分布式鎖,Redisson使用及源碼分析

    本站是提供個人知識管理的網絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現有害或侵權內容,請點擊一鍵舉報。
    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    亚洲国产日韩欧美三级| 91国自产精品中文字幕亚洲| 久久精品国产亚洲av久按摩| 久久人人爽人人爽大片av| 成人精品视频一区二区在线观看| 国产欧美日韩精品成人专区| 国产精品一区二区视频| 麻豆视传媒短视频在线看| 国产高清一区二区不卡| 自拍偷女厕所拍偷区亚洲综合| 欧美午夜视频免费观看| 日韩精品一区二区亚洲| 久久精品国产99精品亚洲| 国产精品视频久久一区| 白白操白白在线免费观看 | 一区二区三区日韩中文| 在线亚洲成人中文字幕高清| 色哟哟哟在线观看视频| 亚洲熟妇中文字幕五十路| 日韩欧美在线看一卡一卡| 美国欧洲日本韩国二本道| 美国欧洲日本韩国二本道| 在线观看中文字幕91| 日本高清二区视频久二区| 精品国产av一区二区三区不卡蜜 | 九九热这里只有精品哦| 人妻人妻人人妻人人澡| 国产一区二区三中文字幕| 亚洲天堂国产精品久久精品| 高清国产日韩欧美熟女| 亚洲最大的中文字幕在线视频 | 日本少妇中文字幕不卡视频| 韩日黄片在线免费观看| 欧美日韩视频中文字幕| 东京热加勒比一区二区| 久久亚洲国产视频三级黄| 欧美成人黄色一级视频| 日韩人妻中文字幕精品| 99久久无色码中文字幕免费| 爱在午夜降临前在线观看| 99少妇偷拍视频在线|