目錄 1.前言 2.單JVM鎖 3.分布式鎖 4.總結 =========正文分割線================= 1.前言鎖就像一把鑰匙,需要加鎖的代碼就像一個房間。出現互斥操作的典型場景:多人同時想進同一個房間爭搶這個房間的鑰匙(只有一把),一人搶到鑰匙,其他人都等待這個人出來歸還鑰匙,此時大家再次爭搶鑰匙循環(huán)下去。 作為終極實戰(zhàn)系列,本篇用java語言分析鎖的原理(源碼剖析)和應用(詳細代碼),根據鎖的作用范圍分為:JVM鎖和分布式鎖。如理解有誤之處,還請指出。 2.單JVM鎖(進程級別)程序部署在一臺服務器上,當容器啟動時(例如tomcat),一臺JVM就運行起來了。本節(jié)分析的鎖均只能在單JVM下生效。因為最終鎖定的是某個對象,這個對象生存在JVM中,自然鎖只能鎖單JVM。這一點很重要。如果你的服務只部署一個實例,那么恭喜你,用以下幾種鎖就可以了。 1.synchronized同步鎖 2.ReentrantLock重入鎖 3.ReadWriteLock讀寫鎖 4.StampedLock戳鎖 由于之前已經詳細分析過原理+使用,各位直接坐飛機吧:同步中的四種鎖synchronized、ReentrantLock、ReadWriteLock、StampedLock 3.分布式鎖(多服務節(jié)點,多進程)3.1基于數據庫鎖實現場景舉例: 賣商品,先查詢庫存>0,更新庫存-1。 1.悲觀鎖:select for update(一致性鎖定讀)
2.樂觀鎖:版本控制,選一個
3.2基于緩存實現(redis,memcached)原理:redisson開源jar包,提供了很多功能,其中就包含分布式鎖。是Redis官方推薦的頂級項目,官網飛機票 核心org.redisson.api.RLock接口封裝了分布式鎖的獲取和釋放。源碼如下: 1 @Override 2 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { 3 long time = unit.toMillis(waitTime); 4 long current = System.currentTimeMillis(); 5 final long threadId = Thread.currentThread().getId(); 6 Long ttl = tryAcquire(leaseTime, unit, threadId);//申請鎖,返回還剩余的鎖過期時間 7 // lock acquired 8 if (ttl == null) { 9 return true; 10 } 11 12 time -= (System.currentTimeMillis() - current); 13 if (time <= 0) { 14 acquireFailed(threadId); 15 return false; 16 } 17 18 current = System.currentTimeMillis(); 19 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); 20 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { 21 if (!subscribeFuture.cancel(false)) { 22 subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { 23 @Override 24 public void operationComplete(Future<RedissonLockEntry> future) throws Exception { 25 if (subscribeFuture.isSuccess()) { 26 unsubscribe(subscribeFuture, threadId); 27 } 28 } 29 }); 30 } 31 acquireFailed(threadId); 32 return false; 33 } 34 35 try { 36 time -= (System.currentTimeMillis() - current); 37 if (time <= 0) { 38 acquireFailed(threadId); 39 return false; 40 } 41 42 while (true) { 43 long currentTime = System.currentTimeMillis(); 44 ttl = tryAcquire(leaseTime, unit, threadId); 45 // lock acquired 46 if (ttl == null) { 47 return true; 48 } 49 50 time -= (System.currentTimeMillis() - currentTime); 51 if (time <= 0) { 52 acquireFailed(threadId); 53 return false; 54 } 55 56 // waiting for message 57 currentTime = System.currentTimeMillis(); 58 if (ttl >= 0 && ttl < time) { 59 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); 60 } else { 61 getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); 62 } 63 64 time -= (System.currentTimeMillis() - currentTime); 65 if (time <= 0) { 66 acquireFailed(threadId); 67 return false; 68 } 69 } 70 } finally { 71 unsubscribe(subscribeFuture, threadId); 72 } 73 // return get(tryLockAsync(waitTime, leaseTime, unit)); 74 }
上述方法,調用加鎖的邏輯就是在tryAcquire(leaseTime, unit, threadId)中,如下圖: 1 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { 2 return get(tryAcquireAsync(leaseTime, unit, threadId));//tryAcquireAsync返回RFutrue 3 } tryAcquireAsync中commandExecutor.evalWriteAsync就是咱們加鎖核心方法了
1 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { 2 internalLockLeaseTime = unit.toMillis(leaseTime); 3 4 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, 5 "if (redis.call('exists', KEYS[1]) == 0) then " + 6 "redis.call('hset', KEYS[1], ARGV[2], 1); " + 7 "redis.call('pexpire', KEYS[1], ARGV[1]); " + 8 "return nil; " + 9 "end; " + 10 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + 11 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 12 "redis.call('pexpire', KEYS[1], ARGV[1]); " + 13 "return nil; " + 14 "end; " + 15 "return redis.call('pttl', KEYS[1]);", 16 Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); 17 } 如上圖,已經到了redis命令了 加鎖:
1 // 檢查是否key已經被占用,如果沒有則設置超時時間和唯一標識,初始化value=1 2 if (redis.call('exists', KEYS[1]) == 0) 3 then 4 redis.call('hset', KEYS[1], ARGV[2], 1); //hset key field value 哈希數據結構 5 redis.call('pexpire', KEYS[1], ARGV[1]); //pexpire key expireTime 設置有效時間 6 return nil; 7 end; 8 // 如果鎖重入,需要判斷鎖的key field 都一直情況下 value 加一 9 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 10 then 11 redis.call('hincrby', KEYS[1], ARGV[2], 1);//hincrby key filed addValue 加1 12 redis.call('pexpire', KEYS[1], ARGV[1]);//pexpire key expireTime重新設置超時時間 13 return nil; 14 end; 15 // 返回剩余的過期時間 16 return redis.call('pttl', KEYS[1]); 以上的方法,當返回空是,說明獲取到鎖,如果返回一個long數值(pttl 命令的返回值),說明鎖已被占用,通過返回剩余時間,外部可以做一些等待時間的判斷和調整。 不再分析解鎖步驟,直接貼上解鎖的redis 命令 解鎖: – KEYS[1] :需要加鎖的key,這里需要是字符串類型。 – KEYS[2] :redis消息的ChannelName,一個分布式鎖對應唯一的一個channelName:“redisson_lock__channel__{” + getName() + “}” – ARGV[1] :reids消息體,這里只需要一個字節(jié)的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請鎖。 – ARGV[2] :鎖的超時時間,防止死鎖 – ARGV[3] :鎖的唯一標識,(UUID.randomUUID()) + “:” + threadId 1 // 如果key已經不存在,說明已經被解鎖,直接發(fā)布(publihs)redis消息 2 if (redis.call('exists', KEYS[1]) == 0) 3 then 4 redis.call('publish', KEYS[2], ARGV[1]);//publish ChannelName message向信道發(fā)送解鎖消息 5 return 1; 6 end; 7 // key和field不匹配,說明當前客戶端線程沒有持有鎖,不能主動解鎖。 8 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) 9 then 10 return nil; 11 end; 12 // 將value減1 13 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //hincrby key filed addValue 減1 14 // 如果counter>0說明鎖在重入,不能刪除key 15 if (counter > 0) 16 then 17 redis.call('pexpire', KEYS[1], ARGV[2]); 18 return 0; 19 else 20 // 刪除key并且publish 解鎖消息 21 redis.call('del', KEYS[1]); 22 redis.call('publish', KEYS[2], ARGV[1]); 23 return 1; 24 end; 25 return nil; 特點:邏輯并不復雜, 實現了可重入功能, 通過pub/sub功能來減少空轉,性能極高。 實現了Lock的大部分功能,支持強制解鎖。
實戰(zhàn):1.創(chuàng)建客戶端配置類: 這里我們最終只用了一種來測試,就是initSingleServerConfig單例模式。 1 package distributed.lock.redis; 2 3 import org.redisson.config.Config; 4 5 /** 6 * 7 * @ClassName:RedissionConfig 8 * @Description:自定義RedissionConfig初始化方法 9 * 支持自定義構造:單例模式,集群模式,主從模式,哨兵模式。 10 * 注:此處使用spring bean 配置文件保證bean單例,見applicationContext-redis.xml 11 * 大家也可以用工廠模式自己維護單例:本類生成RedissionConfig,再RedissonClient redisson = Redisson.create(config);這樣就可以創(chuàng)建RedissonClient 12 * @author diandian.zhang 13 * @date 2017年7月20日下午12:55:50 14 */ 15 public class RedissionConfig { 16 private RedissionConfig() { 17 } 18 19 public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword) { 20 return initSingleServerConfig(redisHost, redisPort, redisPassword, 0); 21 } 22 23 /** 24 * 25 * @Description 使用單例模式初始化構造Config 26 * @param redisHost 27 * @param redisPort 28 * @param redisPassword 29 * @param redisDatabase redis db 默認0 (0~15)有redis.conf配置文件中參數來控制數據庫總數:database 16. 30 * @return 31 * @author diandian.zhang 32 * @date 2017年7月20日下午12:56:21 33 * @since JDK1.8 34 */ 35 public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword,Integer redisDatabase) { 36 Config config = new Config(); 37 config.useSingleServer().setAddress(redisHost + ":" + redisPort) 38 .setPassword(redisPassword) 39 .setDatabase(redisDatabase);//可以不設置,看業(yè)務是否需要隔離 40 //RedissonClient redisson = Redisson.create(config); 41 return config; 42 } 43 44 /** 45 * 46 * @Description 集群模式 47 * @param masterAddress 48 * @param nodeAddressArray 49 * @return 50 * @author diandian.zhang 51 * @date 2017年7月20日下午3:29:32 52 * @since JDK1.8 53 */ 54 public static Config initClusterServerConfig(String masterAddress, String[] nodeAddressArray) { 55 String nodeStr = ""; 56 for(String slave:nodeAddressArray){ 57 nodeStr +=","+slave; 58 } 59 Config config = new Config(); 60 config.useClusterServers() 61 .setScanInterval(2000) // cluster state scan interval in milliseconds 62 .addNodeAddress(nodeStr); 63 return config; 64 } 65 66 /** 67 * 68 * @Description 主從模式 69 * @param masterAddress 一主 70 * @param slaveAddressArray 多從 71 * @return 72 * @author diandian.zhang 73 * @date 2017年7月20日下午2:29:38 74 * @since JDK1.8 75 */ 76 public static Config initMasterSlaveServerConfig(String masterAddress, String[] slaveAddressArray) { 77 String slaveStr = ""; 78 for(String slave:slaveAddressArray){ 79 slaveStr +=","+slave; 80 } 81 Config config = new Config(); 82 config.useMasterSlaveServers() 83 .setMasterAddress(masterAddress)//一主 84 .addSlaveAddress(slaveStr);//多從"127.0.0.1:26389", "127.0.0.1:26379" 85 return config; 86 } 87 88 /** 89 * 90 * @Description 哨兵模式 91 * @param masterAddress 92 * @param slaveAddressArray 93 * @return 94 * @author diandian.zhang 95 * @date 2017年7月20日下午3:01:35 96 * @since JDK1.8 97 */ 98 public static Config initSentinelServerConfig(String masterAddress, String[] sentinelAddressArray) { 99 String sentinelStr = ""; 100 for(String sentinel:sentinelAddressArray){ 101 sentinelStr +=","+sentinel; 102 } 103 Config config = new Config(); 104 config.useSentinelServers() 105 .setMasterName("mymaster") 106 .addSentinelAddress(sentinelStr); 107 return config; 108 } 109 110 111 }
2.分布式鎖實現類 1 package distributed.lock.redis; 2 3 4 5 import java.text.SimpleDateFormat; 6 import java.util.Date; 7 import java.util.concurrent.CountDownLatch; 8 import java.util.concurrent.TimeUnit; 9 10 import org.redisson.Redisson; 11 import org.redisson.api.RLock; 12 import org.redisson.api.RedissonClient; 13 import org.slf4j.Logger; 14 import org.slf4j.LoggerFactory; 15 16 17 public class RedissonTest { 18 private static final Logger logger = LoggerFactory.getLogger(RedissonTest.class); 19 static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 20 //這里可自定義多種模式,單例,集群,主從,哨兵模式。為了簡單這里使用單例模式 21 private static RedissonClient redissonClient = Redisson.create(RedissionConfig.initSingleServerConfig("192.168.50.107", "6379", "password")); 22 23 public static void main(String[] args) { 24 CountDownLatch latch = new CountDownLatch(3); 25 // key 26 String lockKey = "testkey20170802"; 27 try { 28 Thread t1 = new Thread(() -> { 29 doWithLock(lockKey,latch);//函數式編程 30 }, "t1"); 31 Thread t2 = new Thread(() -> { 32 doWithLock(lockKey,latch); 33 }, "t2"); 34 Thread t3 = new Thread(() -> { 35 doWithLock(lockKey,latch); 36 }, "t3"); 37 //啟動線程 38 t1.start(); 39 t2.start(); 40 t3.start(); 41 //等待全部完成 42 latch.await(); 43 System.out.println("3個線程都解鎖完畢,關閉客戶端!"); 44 redissonClient.shutdown(); 45 } catch (Exception e) { 46 e.printStackTrace(); 47 } 48 } 49 50 /** 51 * 52 * @Description 線程執(zhí)行函數體 53 * @param lockKey 54 * @author diandian.zhang 55 * @date 2017年8月2日下午3:37:32 56 * @since JDK1.8 57 */ 58 private static void doWithLock(String lockKey,CountDownLatch latch) { 59 try { 60 System.out.println("進入線程="+Thread.currentThread().getName()+":"+time.format(new Date())); 61 //獲取鎖,30秒內獲取到返回true,未獲取到返回false,60秒過后自動unLock 62 if (tryLock(lockKey, 30, 60, TimeUnit.SECONDS)) { 63 System.out.println(Thread.currentThread().getName() + " 獲取鎖成功!,執(zhí)行需要加鎖的任務"+time.format(new Date())); 64 Thread.sleep(2000L);//休息2秒模擬執(zhí)行需要加鎖的任務 65 //獲取鎖超時 66 }else{ 67 System.out.println(Thread.currentThread().getName() + " 獲取鎖超時!"+time.format(new Date())); 68 } 69 } catch (Exception e) { 70 e.printStackTrace(); 71 } finally { 72 try { 73 //釋放鎖 74 unLock(lockKey); 75 latch.countDown();//完成,計數器減一 76 } catch (Exception e) { 77 e.printStackTrace(); 78 } 79 } 80 } 81 82 /** 83 * 84 * @Description 獲取鎖,鎖waitTime時間內獲取到返回true,未獲取到返回false,租賃期leaseTime過后unLock(除非手動釋放鎖) 85 * @param key 86 * @param waitTime 87 * @param leaseTime 88 * @param timeUnit 89 * @return 90 * @author diandian.zhang 91 * @date 2017年8月2日下午3:24:09 92 * @since JDK1.8 93 */ 94 public static boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit timeUnit) { 95 try { 96 //根據key獲取鎖實例,非公平鎖 97 RLock lock = redissonClient.getLock(key); 98 //在leaseTime時間內阻塞獲取鎖,獲取鎖后持有鎖直到leaseTime租期結束(除非手動unLock釋放鎖)。 99 return lock.tryLock(waitTime, leaseTime, timeUnit); 100 } catch (Exception e) { 101 logger.error("redis獲取分布式鎖異常;key=" + key + ",waitTime=" + waitTime + ",leaseTime=" + leaseTime + 102 ",timeUnit=" + timeUnit, e); 103 return false; 104 } 105 } 106 107 /** 108 * 109 * @Description 釋放鎖 110 * @param key 111 * @author diandian.zhang 112 * @date 2017年8月2日下午3:25:34 113 * @since JDK1.8 114 */ 115 public static void unLock(String key) { 116 RLock lock = redissonClient.getLock(key); 117 lock.unlock(); 118 System.out.println(Thread.currentThread().getName() + " 釋放鎖"+time.format(new Date())); 119 } 120 }
執(zhí)行結果如下:1 進入線程=t3:2017-08-02 16:33:19 2 進入線程=t1:2017-08-02 16:33:19 3 進入線程=t2:2017-08-02 16:33:19 4 t2 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-08-02 16:33:19--->T2 19秒時獲取到鎖 5 t2 釋放鎖2017-08-02 16:33:21--->T2任務完成,21秒時釋放鎖 6 t1 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-08-02 16:33:21--->T1 21秒時獲取到鎖 7 t1 釋放鎖2017-08-02 16:33:23--->T2任務完成,23秒時釋放鎖 8 t3 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-08-02 16:33:23--->T3 23秒時獲取到鎖 9 t3 釋放鎖2017-08-02 16:33:25--->T2任務完成,25秒時釋放鎖 10 3個線程都解鎖完畢,關閉客戶端! 如上圖,3個線程共消耗25-19=6秒,驗證通過,確實互斥鎖住了。 我們用Redis Desktop Manger來看一下redis中數據:1 192.168.50.107:0>hgetall "testkey20170802"--->用key查詢hash所有的值 2 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:30--->T2獲取到鎖field=uuid:線程號 3 2) 1 --->value=1代表重入次數為1 4 192.168.50.107:0>hgetall "testkey20170802"--->T2釋放鎖,T1獲取到鎖 5 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:29 6 2) 1 7 192.168.50.107:0>hgetall "testkey20170802"--->T1釋放鎖,T3獲取到鎖 8 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:31 9 2) 1 10 192.168.50.107:0>hgetall "testkey20170802"--->最后一次查詢時,T3釋放鎖,已無數據
2)基于zookeeper實現原理:每個客戶端(每個JVM內部共用一個客戶端實例)對某個方法加鎖時,在zookeeper上指定節(jié)點的目錄下,生成一個唯一的瞬時有序節(jié)點。判斷是否獲取鎖的方式很簡單,只需要判斷有序節(jié)點中序號最小的一個。當釋放鎖的時候,只需將這個瞬時節(jié)點刪除即可。 我們使用apache的Curator組件來實現,一般使用Client、Framework、Recipes三個組件。 curator下,InterProcessMutex可重入互斥公平鎖,源碼(curator-recipes-2.4.1.jar)注釋如下: A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is "fair" - each user will get the mutex in the order requested (from ZK's point of view) 即一個在JVM上工作的可重入互斥鎖。使用ZK去持有這把鎖。在所有JVM中的進程組,只要使用相同的鎖路徑將會獲得進程間的臨界資源。進一步說,這個互斥鎖是公平的-因為每個線程將會根據請求順序獲得這個互斥量(對于ZK來說) 主要方法如下:
1 // 構造方法 2 public InterProcessMutex(CuratorFramework client, String path) 3 public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) 4 // 通過acquire獲得鎖,并提供超時機制: 5 public void acquire() throws Exception 6 public boolean acquire(long time, TimeUnit unit) throws Exception 7 // 撤銷鎖 8 public void makeRevocable(RevocationListener<InterProcessMutex> listener) 9 public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
我們主要分析核心獲取鎖acquire方法如下:
1 @Override 2 public boolean acquire(long time, TimeUnit unit) throws Exception 3 { 4 return internalLock(time, unit); 5 } 6 7 private boolean internalLock(long time, TimeUnit unit) throws Exception 8 { 9 /* 10 Note on concurrency: a given lockData instance 11 can be only acted on by a single thread so locking isn't necessary 12 */ 13 14 Thread currentThread = Thread.currentThread(); 15 //線程安全map:private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); 16 LockData lockData = threadData.get(currentThread); 17 if ( lockData != null ) 18 { 19 //這里實現了可重入,如果當前線程已經獲取鎖,計數+1,直接返回true 20 lockData.lockCount.incrementAndGet(); 21 return true; 22 } 23 //獲取鎖,核心方法 24 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); 25 if ( lockPath != null ) 26 { //得到鎖,塞進線程安全map 27 LockData newLockData = new LockData(currentThread, lockPath); 28 threadData.put(currentThread, newLockData); 29 return true; 30 } 31 32 return false; 33 }
核心獲取鎖的方法attemptLock源碼如下: 1 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception 2 { 3 final long startMillis = System.currentTimeMillis(); 4 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; 5 final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; 6 int retryCount = 0; 7 8 String ourPath = null; 9 boolean hasTheLock = false; 10 boolean isDone = false; 11 while ( !isDone ) 12 { 13 isDone = true; 14 15 try 16 { 17 if ( localLockNodeBytes != null ) 18 { 19 ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes); 20 } 21 else 22 { //創(chuàng)建瞬時節(jié)點(客戶端斷開連接時刪除),節(jié)點名追加自增數字 23 ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); 24 } 自循環(huán)等待時間:
1 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception 2 { 3 boolean haveTheLock = false; 4 boolean doDelete = false; 5 try 6 { 7 if ( revocable.get() != null ) 8 { 9 client.getData().usingWatcher(revocableWatcher).forPath(ourPath); 10 } 11 12 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )//如果狀態(tài)是開始且未獲取到鎖 13 { 14 List<String> children = getSortedChildren();//獲取父節(jié)點下所有線程的子節(jié)點 15 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // 獲取當前節(jié)點名稱 16 //核心方法:判斷是否獲取到鎖 17 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); 18 if ( predicateResults.getsTheLock() )//獲取到鎖,置true,下一次循環(huán)退出 19 { 20 haveTheLock = true; 21 } 22 else//沒有索取到鎖 23 { 24 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();//這里路徑是上一次獲取到鎖的持有鎖路徑 25 26 synchronized(this)//強制加鎖 27 { 1 @Override 2 public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception 3 { 4 int ourIndex = children.indexOf(sequenceNodeName);//先根據子節(jié)點名獲取children(所有子節(jié)點升序集合)中的索引 5 validateOurIndex(sequenceNodeName, ourIndex);//校驗如果索引為負值,即不存在該子節(jié)點 6 //maxLeases允許同時租賃的數量,這里源代碼寫死了1,但這種設計符合將來拓展,修改maxLeases即可滿足多租賃 7 boolean getsTheLock = ourIndex < maxLeases;//maxLeases=1,所以只有當index=0時才是true,即所有子節(jié)點中升序排序第一個最小值,即第一個請求過來的,這就是核心思想所在! 8 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);//獲取到鎖返回null,否則未獲取到鎖,獲取上一次的獲取到鎖的路徑。后面會監(jiān)視這個路徑用以喚醒請求線程 9 10 return new PredicateResults(pathToWatch, getsTheLock); 11 }
特點:1.可避免死鎖:zk瞬時節(jié)點(Ephemeral Nodes)生命周期和session一致,session結束,節(jié)點自動刪除。 實戰(zhàn):1.創(chuàng)建客戶端client 1 package distributed.lock.zk; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 import java.util.concurrent.TimeUnit; 6 7 import org.apache.curator.framework.CuratorFramework; 8 import org.apache.curator.framework.CuratorFrameworkFactory; 9 import org.apache.curator.framework.recipes.locks.InterProcessMutex; 10 import org.apache.curator.retry.ExponentialBackoffRetry; 11 import org.apache.curator.retry.RetryNTimes; 12 import org.jboss.netty.channel.StaticChannelPipeline; 13 import org.omg.CORBA.PRIVATE_MEMBER; 14 15 /** 16 * 17 * @ClassName:CuratorDistrLockTest 18 * @Description:Curator包實現zk分布式鎖:利用了zookeeper的臨時順序節(jié)點特性,一旦客戶端失去連接后,則就會自動清除該節(jié)點。 19 * @author diandian.zhang 20 * @date 2017年7月11日下午12:43:44 21 */ 22 23 public class CuratorDistrLock { 24 private static final String ZK_ADDRESS = "192.168.50.253:2181";//zk 25 private static final String ZK_LOCK_PATH = "/zktest";//path 26 static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 27 28 public static void main(String[] args) { 29 try { 30 //創(chuàng)建zk客戶端 31 // CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(3, 1000)); 32 CuratorFramework client = CuratorFrameworkFactory.builder() 33 .connectString(ZK_ADDRESS) 34 .sessionTimeoutMs(5000) 35 .retryPolicy(new ExponentialBackoffRetry(1000, 10)) 36 .build(); 37 //開啟 38 client.start(); 39 System.out.println("zk client start successfully!"+time.format(new Date())); 40 41 Thread t1 = new Thread(() -> { 42 doWithLock(client);//函數式編程 43 }, "t1"); 44 Thread t2 = new Thread(() -> { 45 doWithLock(client); 46 }, "t2"); 47 Thread t3 = new Thread(() -> { 48 doWithLock(client); 49 }, "t3"); 50 //啟動線程 51 t1.start(); 52 t2.start(); 53 t3.start(); 54 } catch (Exception e) { 55 e.printStackTrace(); 56 } 57 } 58 59 /** 60 * 61 * @Description 線程執(zhí)行函數體 62 * @param client 63 * @param lock 64 * @author diandian.zhang 65 * @date 2017年7月12日下午6:00:53 66 * @since JDK1.8 67 */ 68 private static void doWithLock(CuratorFramework client) { 69 //依賴ZK生成的可重入互斥公平鎖(按照請求順序) 70 InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); 71 try { 72 System.out.println("進入線程="+Thread.currentThread().getName()+":"+time.format(new Date())); 73 74 //花20秒時間嘗試獲取鎖 75 if (lock.acquire(20, TimeUnit.SECONDS)) { 76 System.out.println(Thread.currentThread().getName() + " 獲取鎖成功!,執(zhí)行需要加鎖的任務"+time.format(new Date())); 77 Thread.sleep(2000L);//休息2秒模擬執(zhí)行需要加鎖的任務 78 //獲取鎖超時 79 }else{ 80 System.out.println(Thread.currentThread().getName() + " 獲取鎖超時!"+time.format(new Date())); 81 } 82 } catch (Exception e) { 83 e.printStackTrace(); 84 } finally { 85 try { 86 //當前線程獲取到鎖,那么最后需要釋放鎖(實際上是刪除節(jié)點) 87 if (lock.isAcquiredInThisProcess()) { 88 lock.release(); 89 System.out.println(Thread.currentThread().getName() + " 釋放鎖"+time.format(new Date())); 90 } 91 } catch (Exception e) { 92 e.printStackTrace(); 93 } 94 } 95 } 96 97 }
執(zhí)行結果: zk client start successfully! 進入線程=t2:2017-07-13 11:13:23 進入線程=t1:2017-07-13 11:13:23 進入線程=t3:2017-07-13 11:13:23 t2 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-07-13 11:13:23----》起始時間23秒 t2 釋放鎖2017-07-13 11:13:25 t3 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-07-13 11:13:25----》驗證耗時2秒,T2執(zhí)行完,T3執(zhí)行 t3 釋放鎖2017-07-13 11:13:27 t1 獲取鎖成功!,執(zhí)行需要加鎖的任務2017-07-13 11:13:27----》驗證耗時2秒,T3執(zhí)行完,T1執(zhí)行 t1 釋放鎖2017-07-13 11:13:29----》驗證耗時2秒,T1執(zhí)行完,3個任務共耗時=29-23=6秒,驗證互斥鎖達到目標。 查看zookeeper節(jié)點: 1.客戶端連接 zkCli.sh -server 192.168.50.253:2181 2.查看節(jié)點 [zk: 192.168.50.253:2181(CONNECTED) 80] ls /-----》查看根目錄 [zk: 192.168.50.253:2181(CONNECTED) 81] ls /zktest -----》查看我們創(chuàng)建的子節(jié)點 [zk: 192.168.50.253:2181(CONNECTED) 82] ls /zktest-----》任務執(zhí)行完畢最終釋放了子節(jié)點 4.總結比較
=====附Redis命令=======
======參考====== |
|