目錄
1.分布式鎖介紹在計(jì)算機(jī)系統(tǒng)中,鎖作為一種控制并發(fā)的機(jī)制無(wú)處不在。 單機(jī)環(huán)境下,操作系統(tǒng)能夠在進(jìn)程或線程之間通過(guò)本地的鎖來(lái)控制并發(fā)程序的行為。而在如今的大型復(fù)雜系統(tǒng)中,通常采用的是分布式架構(gòu)提供服務(wù)。 分布式環(huán)境下,基于本地單機(jī)的鎖無(wú)法控制分布式系統(tǒng)中分開(kāi)部署客戶端的并發(fā)行為,此時(shí)分布式鎖就應(yīng)運(yùn)而生了。 一個(gè)可靠的分布式鎖應(yīng)該具備以下特性: 1.互斥性:作為鎖,需要保證任何時(shí)刻只能有一個(gè)客戶端(用戶)持有鎖 2.可重入: 同一個(gè)客戶端在獲得鎖后,可以再次進(jìn)行加鎖 3.高可用:獲取鎖和釋放鎖的效率較高,不會(huì)出現(xiàn)單點(diǎn)故障 4.自動(dòng)重試機(jī)制:當(dāng)客戶端加鎖失敗時(shí),能夠提供一種機(jī)制讓客戶端自動(dòng)重試 2.分布式鎖api接口/** * 分布式鎖 api接口 */ public interface DistributeLock { /** * 嘗試加鎖 * @param lockKey 鎖的key * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lock(String lockKey); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param expireTime 過(guò)期時(shí)間 單位:秒 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lock(String lockKey, int expireTime); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param requestID 用戶ID * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lock(String lockKey, String requestID); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param requestID 用戶ID * @param expireTime 過(guò)期時(shí)間 單位:秒 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lock(String lockKey, String requestID, int expireTime); /** * 嘗試加鎖,失敗自動(dòng)重試 會(huì)阻塞當(dāng)前線程 * @param lockKey 鎖的key * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey); /** * 嘗試加鎖,失敗自動(dòng)重試 會(huì)阻塞當(dāng)前線程 (requestID相等 可重入) * @param lockKey 鎖的key * @param requestID 用戶ID * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, String requestID); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param expireTime 過(guò)期時(shí)間 單位:秒 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, int expireTime); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param expireTime 過(guò)期時(shí)間 單位:秒 * @param retryCount 重試次數(shù) * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, int expireTime, int retryCount); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param requestID 用戶ID * @param expireTime 過(guò)期時(shí)間 單位:秒 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, String requestID, int expireTime); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param expireTime 過(guò)期時(shí)間 單位:秒 * @param requestID 用戶ID * @param retryCount 重試次數(shù) * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, String requestID, int expireTime, int retryCount); /** * 釋放鎖 * @param lockKey 鎖的key * @param requestID 用戶ID * @return true 釋放自己所持有的鎖 成功 * false 釋放自己所持有的鎖 失敗 * */ boolean unLock(String lockKey, String requestID); } 3.基于redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)3.1 基礎(chǔ)代碼當(dāng)前實(shí)現(xiàn)版本的分布式鎖基于redis實(shí)現(xiàn),使用的是jedis連接池來(lái)和redis進(jìn)行交互,并將其封裝為redisClient工具類(僅封裝了demo所需的少數(shù)接口) redisClient工具類:public class RedisClient { private static final Logger LOGGER = LoggerFactory.getLogger(RedisClient.class); private JedisPool pool; private static RedisClient instance = new RedisClient(); private RedisClient() { init(); } public static RedisClient getInstance(){ return instance; } public Object eval(String script, List<String> keys, List<String> args) { Jedis jedis = getJedis(); Object result = jedis.eval(script, keys, args); jedis.close(); return result; } public String get(final String key){ Jedis jedis = getJedis(); String result = jedis.get(key); jedis.close(); return result; } public String set(final String key, final String value, final String nxxx, final String expx, final int time) { Jedis jedis = getJedis(); String result = jedis.set(key, value, nxxx, expx, time); jedis.close(); return result; } private void init(){ Properties redisConfig = PropsUtil.loadProps("redis.properties"); int maxTotal = PropsUtil.getInt(redisConfig,"maxTotal",10); String ip = PropsUtil.getString(redisConfig,"ip","127.0.0.1"); int port = PropsUtil.getInt(redisConfig,"port",6379); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(maxTotal); pool = new JedisPool(jedisPoolConfig, ip,port); LOGGER.info("連接池初始化成功 ip={}, port={}, maxTotal={}",ip,port,maxTotal); } private Jedis getJedis(){ return pool.getResource(); } } 所依賴的工具類:package util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * @Author xiongyx * @Create 2018/4/11. */ public final class PropsUtil { private static final Logger LOGGER = LoggerFactory.getLogger(PropsUtil.class); /** * 讀取配置文件 * */ public static Properties loadProps(String fileName){ Properties props = null; InputStream is = null; try{ //:::絕對(duì)路徑獲得輸入流 is = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName); if(is == null){ //:::沒(méi)找到文件,拋出異常 throw new FileNotFoundException(fileName + " is not found"); } props = new Properties(); props.load(is); }catch(IOException e){ LOGGER.error("load propertis file fail",e); }finally { if(is != null){ try{ //:::關(guān)閉io流 is.close(); } catch (IOException e) { LOGGER.error("close input Stream fail",e); } } } return props; } /** * 獲取字符串屬性(默認(rèn)為空字符串) * */ public static String getString(Properties properties,String key){ //:::調(diào)用重載函數(shù) 默認(rèn)值為:空字符串 return getString(properties,key,""); } /** * 獲取字符串屬性 * */ public static String getString(Properties properties,String key,String defaultValue){ //:::key對(duì)應(yīng)的value數(shù)據(jù)是否存在 if(properties.containsKey(key)){ return properties.getProperty(key); }else{ return defaultValue; } } /** * 獲取int屬性 默認(rèn)值為0 * */ public static int getInt(Properties properties,String key){ //:::調(diào)用重載函數(shù),默認(rèn)為:0 return getInt(properties,key,0); } /** * 獲取int屬性 * */ public static int getInt(Properties properties,String key,int defaultValue){ //:::key對(duì)應(yīng)的value數(shù)據(jù)是否存在 if(properties.containsKey(key)){ return CastUtil.castToInt(properties.getProperty(key)); }else{ return defaultValue; } } /** * 獲取boolean屬性,默認(rèn)值為false */ public static boolean getBoolean(Properties properties,String key){ return getBoolean(properties,key,false); } /** * 獲取boolean屬性 */ public static boolean getBoolean(Properties properties,String key,boolean defaultValue){ //:::key對(duì)應(yīng)的value數(shù)據(jù)是否存在 if(properties.containsKey(key)){ return CastUtil.castToBoolean(properties.getProperty(key)); }else{ return defaultValue; } } } public final class CastUtil { /** * 轉(zhuǎn)為 string * */ public static String castToString(Object obj){ return castToString(obj,""); } /** * 轉(zhuǎn)為 string 提供默認(rèn)值 * */ public static String castToString(Object obj,String defaultValue){ if(obj == null){ return defaultValue; }else{ return obj.toString(); } } /** * 轉(zhuǎn)為 int * */ public static int castToInt(Object obj){ return castToInt(obj,0); } /** * 轉(zhuǎn)為 int 提供默認(rèn)值 * */ public static int castToInt(Object obj,int defaultValue){ if(obj == null){ return defaultValue; }else{ return Integer.parseInt(obj.toString()); } } /** * 轉(zhuǎn)為 double * */ public static double castToDouble(Object obj){ return castToDouble(obj,0); } /** * 轉(zhuǎn)為 double 提供默認(rèn)值 * */ public static double castToDouble(Object obj,double defaultValue){ if(obj == null){ return defaultValue; }else{ return Double.parseDouble(obj.toString()); } } /** * 轉(zhuǎn)為 long * */ public static long castToLong(Object obj){ return castToLong(obj,0); } /** * 轉(zhuǎn)為 long 提供默認(rèn)值 * */ public static long castToLong(Object obj,long defaultValue){ if(obj == null){ return defaultValue; }else{ return Long.parseLong(obj.toString()); } } /** * 轉(zhuǎn)為 boolean * */ public static boolean castToBoolean(Object obj){ return castToBoolean(obj,false); } /** * 轉(zhuǎn)為 boolean 提供默認(rèn)值 * */ public static boolean castToBoolean(Object obj,boolean defaultValue){ if(obj == null){ return defaultValue; }else{ return Boolean.parseBoolean(obj.toString()); } } } 初始化lua腳本 LuaScript.java:在分布式鎖初始化時(shí),使用init方法讀取lua腳本
View Code
單例的RedisDistributeLock基礎(chǔ)屬性public final class RedisDistributeLock implements DistributeLock { /** * 無(wú)限重試 * */ public static final int UN_LIMIT_RETRY = -1; private RedisDistributeLock() { LuaScript.init(); } private static DistributeLock instance = new RedisDistributeLock(); /** * 持有鎖 成功標(biāo)識(shí) * */ private static final Long ADD_LOCK_SUCCESS = 1L; /** * 釋放鎖 失敗標(biāo)識(shí) * */ private static final Integer RELEASE_LOCK_SUCCESS = 1; /** * 默認(rèn)過(guò)期時(shí)間 單位:秒 * */ private static final int DEFAULT_EXPIRE_TIME_SECOND = 300; /** * 默認(rèn)加鎖重試時(shí)間 單位:毫秒 * */ private static final int DEFAULT_RETRY_FIXED_TIME = 3000; /** * 默認(rèn)的加鎖浮動(dòng)時(shí)間區(qū)間 單位:毫秒 * */ private static final int DEFAULT_RETRY_TIME_RANGE = 1000; /** * 默認(rèn)的加鎖重試次數(shù) * */ private static final int DEFAULT_RETRY_COUNT = 30; /** * lockCount Key前綴 * */ private static final String LOCK_COUNT_KEY_PREFIX = "lock_count:"; public static DistributeLock getInstance(){ return instance; } } 3.2 加鎖實(shí)現(xiàn)使用redis實(shí)現(xiàn)分布式鎖時(shí),加鎖操作必須是原子操作,否則多客戶端并發(fā)操作時(shí)會(huì)導(dǎo)致各種各樣的問(wèn)題。詳情請(qǐng)見(jiàn):Redis分布式鎖的正確實(shí)現(xiàn)方式。 由于我們實(shí)現(xiàn)的是可重入鎖,加鎖過(guò)程中需要判斷客戶端ID的正確與否。而redis原生的簡(jiǎn)單接口沒(méi)法保證一系列邏輯的原子性執(zhí)行,因此采用了lua腳本來(lái)實(shí)現(xiàn)加鎖操作。lua腳本可以讓redis在執(zhí)行時(shí)將一連串的操作以原子化的方式執(zhí)行。 加鎖lua腳本 lock.lua-- 獲取參數(shù) local requestIDKey = KEYS[1] local currentRequestID = ARGV[1] local expireTimeTTL = ARGV[2] -- setnx 嘗試加鎖 local lockSet = redis.call('hsetnx',KEYS[1],'lockKey',currentRequestID) if lockSet == 1 then -- 加鎖成功 設(shè)置過(guò)期時(shí)間和重入次數(shù)=1 redis.call('expire',KEYS[1],expireTimeTTL) redis.call('hset',KEYS[1],'lockCount',1) return 1 else -- 判斷是否是重入加鎖 local oldRequestID = redis.call('hget',KEYS[1],'lockKey') if currentRequestID == oldRequestID then -- 是重入加鎖 redis.call('hincrby',KEYS[1],'lockCount',1) -- 重置過(guò)期時(shí)間 redis.call('expire',KEYS[1],expireTimeTTL) return 1 else -- requestID不一致,加鎖失敗 return 0 end end 加鎖方法實(shí)現(xiàn):加鎖時(shí),通過(guò)判斷eval的返回值來(lái)判斷加鎖是否成功。 @Override public String lock(String lockKey) { String uuid = UUID.randomUUID().toString(); return lock(lockKey,uuid); } @Override public String lock(String lockKey, int expireTime) { String uuid = UUID.randomUUID().toString(); return lock(lockKey,uuid,expireTime); } @Override public String lock(String lockKey, String requestID) { return lock(lockKey,requestID,DEFAULT_EXPIRE_TIME_SECOND); } @Override public String lock(String lockKey, String requestID, int expireTime) { RedisClient redisClient = RedisClient.getInstance(); List<String> keyList = Arrays.asList( lockKey ); List<String> argsList = Arrays.asList( requestID, expireTime + "" ); Long result = (Long)redisClient.eval(LuaScript.LOCK_SCRIPT, keyList, argsList); if(result.equals(ADD_LOCK_SUCCESS)){ return requestID; }else{ return null; } } 3.3 解鎖實(shí)現(xiàn)解鎖操作同樣需要一連串的操作,由于原子化操作的需求,因此同樣使用lua腳本實(shí)現(xiàn)解鎖功能。 解鎖lua腳本 unlock.lua-- 獲取參數(shù) local requestIDKey = KEYS[1] local currentRequestID = ARGV[1] -- 判斷requestID一致性 if redis.call('hget',KEYS[1],'lockKey') == currentRequestID then -- requestID相同,重入次數(shù)自減 local currentCount = redis.call('hincrby',KEYS[1],'lockCount',-1) if currentCount == 0 then -- 重入次數(shù)為0,刪除鎖 redis.call('del',KEYS[1]) return 1 else return 0 end else return 0 end 解鎖方法實(shí)現(xiàn):@Override public boolean unLock(String lockKey, String requestID) { List<String> keyList = Arrays.asList( lockKey ); List<String> argsList = Collections.singletonList(requestID); Object result = RedisClient.getInstance().eval(LuaScript.UN_LOCK_SCRIPT, keyList, argsList); // 釋放鎖成功 return RELEASE_LOCK_SUCCESS.equals(result); } 3.4 自動(dòng)重試機(jī)制實(shí)現(xiàn)調(diào)用lockAndRetry方法進(jìn)行加鎖時(shí),如果加鎖失敗,則當(dāng)前客戶端線程會(huì)短暫的休眠一段時(shí)間,并進(jìn)行重試。在重試了一定的次數(shù)后,會(huì)終止重試加鎖操作,從而加鎖失敗。 需要注意的是,加鎖失敗之后的線程休眠時(shí)長(zhǎng)是"固定值 + 隨機(jī)值",引入隨機(jī)值的主要目的是防止高并發(fā)時(shí)大量的客戶端在幾乎同一時(shí)間被喚醒并進(jìn)行加鎖重試,給redis服務(wù)器帶來(lái)周期性的、不必要的瞬時(shí)壓力。 @Override public String lockAndRetry(String lockKey) { String uuid = UUID.randomUUID().toString(); return lockAndRetry(lockKey,uuid); } @Override public String lockAndRetry(String lockKey, String requestID) { return lockAndRetry(lockKey,requestID,DEFAULT_EXPIRE_TIME_SECOND); } @Override public String lockAndRetry(String lockKey, int expireTime) { String uuid = UUID.randomUUID().toString(); return lockAndRetry(lockKey,uuid,expireTime); } @Override public String lockAndRetry(String lockKey, int expireTime, int retryCount) { String uuid = UUID.randomUUID().toString(); return lockAndRetry(lockKey,uuid,expireTime,retryCount); } @Override public String lockAndRetry(String lockKey, String requestID, int expireTime) { return lockAndRetry(lockKey,requestID,expireTime,DEFAULT_RETRY_COUNT); } @Override public String lockAndRetry(String lockKey, String requestID, int expireTime, int retryCount) { if(retryCount <= 0){ // retryCount小于等于0 無(wú)限循環(huán),一直嘗試加鎖 while(true){ String result = lock(lockKey,requestID,expireTime); if(result != null){ return result; } // 休眠一會(huì) sleepSomeTime(); } }else{ // retryCount大于0 嘗試指定次數(shù)后,退出 for(int i=0; i<retryCount; i++){ String result = lock(lockKey,requestID,expireTime); if(result != null){ return result; } // 休眠一會(huì) sleepSomeTime(); } return null; } } 4.使用注解切面簡(jiǎn)化redis分布式鎖的使用通過(guò)在方法上引入RedisLock注解切面,讓對(duì)應(yīng)方法被redis分布式鎖管理起來(lái),可以簡(jiǎn)化redis分布式鎖的使用。 切面注解 RedisLock@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RedisLock { /** * 無(wú)限重試 * */ int UN_LIMIT_RETRY = RedisDistributeLock.UN_LIMIT_RETRY; String lockKey(); int expireTime(); int retryCount(); } RedisLock 切面實(shí)現(xiàn)@Component @Aspect public class RedisLockAspect { private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockAspect.class); private static final ThreadLocal<String> REQUEST_ID_MAP = new ThreadLocal<>(); @Pointcut("@annotation(annotation.RedisLock)") public void annotationPointcut() { } @Around("annotationPointcut()") public Object around(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature methodSignature = (MethodSignature)joinPoint.getSignature(); Method method = methodSignature.getMethod(); RedisLock annotation = method.getAnnotation(RedisLock.class); boolean lockSuccess = lock(annotation); if(lockSuccess){ Object result = joinPoint.proceed(); unlock(annotation); return result; } return null; } /** * 加鎖 * */ private boolean lock(RedisLock annotation){ DistributeLock distributeLock = RedisDistributeLock.getInstance(); int retryCount = annotation.retryCount(); String requestID = REQUEST_ID_MAP.get(); if(requestID != null){ // 當(dāng)前線程 已經(jīng)存在requestID distributeLock.lockAndRetry(annotation.lockKey(),requestID,annotation.expireTime(),retryCount); LOGGER.info("重入加鎖成功 requestID=" + requestID); return true; }else{ // 當(dāng)前線程 不存在requestID String newRequestID = distributeLock.lockAndRetry(annotation.lockKey(),annotation.expireTime(),retryCount); if(newRequestID != null){ // 加鎖成功,設(shè)置新的requestID REQUEST_ID_MAP.set(newRequestID); LOGGER.info("加鎖成功 newRequestID=" + newRequestID); return true; }else{ LOGGER.info("加鎖失敗,超過(guò)重試次數(shù),直接返回 retryCount={}",retryCount); return false; } } } /** * 解鎖 * */ private void unlock(RedisLock annotation){ DistributeLock distributeLock = RedisDistributeLock.getInstance(); String requestID = REQUEST_ID_MAP.get(); if(requestID != null){ // 解鎖成功 boolean unLockSuccess = distributeLock.unLock(annotation.lockKey(),requestID); if(unLockSuccess){ // 移除 ThreadLocal中的數(shù)據(jù) REQUEST_ID_MAP.remove(); LOGGER.info("解鎖成功 requestID=" + requestID); } } } } 使用例子@Service("testService") public class TestServiceImpl implements TestService { @Override @RedisLock(lockKey = "lockKey", expireTime = 100, retryCount = RedisLock.UN_LIMIT_RETRY) public String method1() { return "method1"; } @Override @RedisLock(lockKey = "lockKey", expireTime = 100, retryCount = 3) public String method2() { return "method2"; } } 5.總結(jié)5.1 當(dāng)前版本缺陷主從同步可能導(dǎo)致鎖的互斥性失效 在redis主從結(jié)構(gòu)下,出于性能的考慮,redis采用的是主從異步復(fù)制的策略,這會(huì)導(dǎo)致短時(shí)間內(nèi)主庫(kù)和從庫(kù)數(shù)據(jù)短暫的不一致。 試想,當(dāng)某一客戶端剛剛加鎖完畢,redis主庫(kù)還沒(méi)有來(lái)得及和從庫(kù)同步就掛了,之后從庫(kù)中新選拔出的主庫(kù)是沒(méi)有對(duì)應(yīng)鎖記錄的,這就可能導(dǎo)致多個(gè)客戶端加鎖成功,破壞了鎖的互斥性。 休眠并反復(fù)嘗試加鎖效率較低 lockAndRetry方法在客戶端線程加鎖失敗后,會(huì)休眠一段時(shí)間之后再進(jìn)行重試。當(dāng)鎖的持有者持有鎖的時(shí)間很長(zhǎng)時(shí),其它客戶端會(huì)有大量無(wú)效的重試操作,造成系統(tǒng)資源的浪費(fèi)。 進(jìn)一步優(yōu)化時(shí),可以使用發(fā)布訂閱的方式。這時(shí)加鎖失敗的客戶端會(huì)監(jiān)聽(tīng)鎖被釋放的信號(hào),在鎖真正被釋放時(shí)才會(huì)進(jìn)行新的加鎖操作,從而避免不必要的輪詢操作,以提高效率。 不是一個(gè)公平的鎖 當(dāng)前實(shí)現(xiàn)版本中,多個(gè)客戶端同時(shí)對(duì)鎖進(jìn)行搶占時(shí),是完全隨機(jī)的,既不遵循先來(lái)后到的順序,客戶端之間也沒(méi)有加鎖的優(yōu)先級(jí)區(qū)別。 后續(xù)優(yōu)化時(shí)可以提供一個(gè)創(chuàng)建公平鎖的接口,能指定加鎖的優(yōu)先級(jí),內(nèi)部使用一個(gè)優(yōu)先級(jí)隊(duì)列維護(hù)加鎖客戶端的順序。公平鎖雖然效率稍低,但在一些場(chǎng)景能更好的控制并發(fā)行為。 5.2 經(jīng)驗(yàn)總結(jié)前段時(shí)間看了一篇關(guān)于redis分布式鎖的技術(shù)文章,發(fā)現(xiàn)自己對(duì)于分布式鎖的了解還很有限。紙上得來(lái)終覺(jué)淺,為了更好的掌握相關(guān)知識(shí),決定嘗試著自己實(shí)現(xiàn)一個(gè)demo級(jí)別的redis分布式鎖,通過(guò)這次實(shí)踐,更進(jìn)一步的學(xué)習(xí)了lua語(yǔ)言和redis相關(guān)內(nèi)容。 這篇博客的完整代碼在我的github上:https://github.com/1399852153/RedisDistributedLock,存在許多不足之處,請(qǐng)多多指教。 |
|