Redis的分散式鎖
一、鎖的作用
當多執行緒執行某一業務時(特別是對資料的更新、新增)等操作,可能就會出現多個執行緒對同一條資料進行修改。其最終的結果一
定與你期望的結果“不太一樣”,這就與需要一把鎖來控制執行緒排排隊了 - java內部為我們提供瞭解決方案,可以使用synchronized
或Lock等方式來實現。
但是在生產過程中,因為效能的關係,多數公司都會採用多臺伺服器來搭建”分散式”。一條請求過來之後,不一定會打到哪臺服務
器上,這就保證不了多臺伺服器的某一”關鍵業務”同一時間只會有一條執行緒進行執行。這時就需要一個“媒介”來充當"鎖"這個角色,這
個“媒介”需要滿足一些特性才能勝任這個工作:
(1)各個伺服器都可以對其進行獲取和操作(基於資料庫的實現方式);
(2)高效能的獲取和釋放鎖;
(3)具備非阻塞鎖特性,如果獲取不到鎖則立即返回失敗;
(4)具備鎖等待的特性,即沒有獲取到鎖將繼續等待獲取鎖;
(5)具有鎖失效機制,防止死鎖;
(6)具備可重入特性;
二、分散式鎖的一種實現
高效能、失效時間、可重入、非阻塞等特性,Redis都能滿足,所以基於Redis實現是解決分散式鎖的一種方式。基於Redis的分散式
鎖的實現思想:
A、加鎖
1、計算開始時間,防止一直獲取不到鎖而死迴圈;
2、根據系統名稱:業務名稱:主鍵ID來建立一個分散式鎖的key;
3、迴圈獲取分散式鎖(嘗試時間=當前時間-開始時間<5000毫秒);
4、使用setnx方法獲取分散式鎖(key,value - 當前系統時間+"$TRUE",失效時間 - 10毫秒);
5、如果設定成功則獲取分散式鎖成功,返回true;
6、如果獲取失敗則睡5毫秒,繼續嘗試,直到過了時間;
7、如果時間之內沒有獲取到分散式所,則返回失敗;
B、解鎖
1、根據系統號,業務名稱,主鍵拼接分散式鎖的key;
2、根據key獲取到真實的key(使用keys方法);
3、判斷真實的key是否為存在,是否唯一,如果否則為空;
4、如果key唯一則進行刪除;
栗子:
/** * Jedis的工具類. */ public class JedisUtil { private static final Logger LOG = LoggerFactory.getLogger(JedisUtil.class); private static Map<String, JedisUtil> uniqueInstance = new HashMap(); private JedisPool jedisPool; private static JedisUtil jedisUtil = null; public JedisUtil() { super(); } /** * 選擇需要連結哪個庫 * @param serverName */ public JedisUtil(String serverName) { if (StringUtils.isBlank(serverName)) { throw new RuntimeException("請指定Redis主或從庫!"); } else { //實際使用需從配置檔案中獲取 String ip = "127.0.0.1"; Integer port = 6379; String pwd = "test"; //僅僅是測試用 String redisPoolMaxActive = "30"; if ("master".equals(serverName)) { //連結到主庫 this.initialPool(ip, port, pwd, redisPoolMaxActive); }else{ //可以連結為別的Redis庫 } } } /** * 初始化jedisPool * @param ip * @param port * @param pwd * @param redisPoolMaxActive */ private void initialPool(String ip, int port, String pwd, String redisPoolMaxActive) { JedisPoolConfig config = new JedisPoolConfig(); //最大連線數,預設8個 config.setMaxTotal(256); //最大空閒連線數,預設8個 config.setMaxIdle(256); if (StringUtils.isNotBlank(redisPoolMaxActive)) { Integer redisPoolMaxActiveInt = Integer.valueOf(Integer.parseInt(redisPoolMaxActive)); if (redisPoolMaxActiveInt.intValue() > 512) { redisPoolMaxActiveInt = Integer.valueOf(512); } config.setMaxTotal(redisPoolMaxActiveInt.intValue()); config.setMaxIdle(redisPoolMaxActiveInt.intValue()); } //獲取連線時的最大等待毫秒數 config.setMaxWaitMillis(1000L); //在獲取連線的時候檢查有效性,預設false config.setTestOnBorrow(true); config.setTestOnReturn(true); //在空閒時檢查有效性,預設false config.setTestWhileIdle(true); //每次逐出檢查時 逐出的最大數目。如果為負數就是: 1/abs(n), 預設3 config.setNumTestsPerEvictionRun(-1); //逐出掃描的時間間隔(毫秒) 如果為負數,則不執行逐出執行緒, 預設-1 config.setTimeBetweenEvictionRunsMillis(30000L); //逐出連線的最小空閒時間,預設1800000毫秒(30分鐘) config.setMinEvictableIdleTimeMillis(720000L); //物件空閒多久後逐出,當空閒時間>該值且空閒連線 > 最大空閒數時直接逐出,不再根據MinEvictableIdleTimeMillis判斷 (預設逐出策略) config.setSoftMinEvictableIdleTimeMillis(360000L); this.jedisPool = new JedisPool(config, ip, port, 2000, pwd); LogUtils.info(LOG, "jedisPool init finish", new Object[]{"jedisPool", this.jedisPool}); } /** * 單例獲取jedisPool * @param serverName Redis庫的名稱 * @return */ public static JedisUtil getInstance(String serverName) { jedisUtil = (JedisUtil)uniqueInstance.get(serverName); if(jedisUtil == null) { Class var1 = JedisUtil.class; synchronized(JedisUtil.class) { jedisUtil = (JedisUtil)uniqueInstance.get(serverName); if(jedisUtil == null) { jedisUtil = new JedisUtil(serverName); uniqueInstance.put(serverName, jedisUtil); } } } return jedisUtil; } public Long setnx(String key, String value, int expireTime, int dbIndex) { Jedis jedis = null; Long var1; try { jedis = this.jedisPool.getResource(); jedis.select(dbIndex); Long ret = jedis.setnx(key, value); if(ret.longValue() == 1L && expireTime > 0) { jedis.expire(key, expireTime); } var1 = ret; } catch (Exception var11) { LogUtils.error(LOG, "redis setnx error. ", var11, new Object[0]); throw var11; } finally { if(jedis != null) { jedis.close(); } } return var1; } /** * 查詢key * @param sKey * @param index * @return */ public Set<String> key(String sKey, int index) { Jedis jedis = null; Set var2; try { jedis = this.jedisPool.getResource(); jedis.select(index); var2 = jedis.keys(sKey); } catch (Exception var8) { LogUtils.error(LOG, "redis key error. ", var8, new Object[0]); throw var8; } finally { if(jedis != null) { jedis.close(); } } return var2; } /** * 刪除key * @param sKey * @param index * @return */ public Long del(String sKey, int index) { Jedis jedis = null; Long var3; try { jedis = this.jedisPool.getResource(); jedis.select(index); var3 = jedis.del(sKey); } catch (Exception var8) { LogUtils.error(LOG, "redis del error. ", var8, new Object[0]); throw var8; } finally { if(jedis != null) { jedis.close(); } } return var3; } }
分散式鎖的工具類:
public class LockRedisService { private static final Logger LOG = LoggerFactory.getLogger(LockRedisService.class); private static final Integer EXPIRE_SECOND = Integer.valueOf(10); public LockRedisService() {} /** * 嘗試獲取分散式鎖 * @param systemName 系統名稱 * @param business業務名稱 * @param keyfix主鍵 * @return */ public static Boolean tryLock(String systemName, String business, String keyfix) { long start = System.currentTimeMillis(); String key = String.format("lock:%s:%s:%s", new Object[]{systemName, business, keyfix}); LogUtils.info(LOG, "嘗試獲取分散式鎖", new Object[]{"key", key}); do { try { long ret = JedisUtil.getInstance("master").setnx(key, System.currentTimeMillis() + "$TRUE", EXPIRE_SECOND, 1).longValue(); if(ret == 1L) { LogUtils.info(LOG, "成功獲得分散式鎖", new Object[]{"key", key}); return Boolean.TRUE; } Thread.sleep(5L); } catch (Exception var8) { LogUtils.error(LOG, "獲取鎖失敗", var8, new Object[0]); } } while(System.currentTimeMillis() - start < 5000L); LogUtils.warn(LOG, "獲取分散式鎖失敗", new Object[]{"key", key}); return Boolean.FALSE; } /** * 解鎖 * @param systemName 系統名稱 * @param business業務名稱 * @param keyfix主鍵 * @return */ public static Boolean unlock(String systemName, String business, String keyfix) { String key = String.format("lock:%s:%s:%s", new Object[]{systemName, business, keyfix}); try { //1為使用哪個Redis庫,正常應該寫為列舉類 Set<String> realKeySet = JedisUtil.getInstance("master").key(key,1); if(CollectionUtils.isEmpty(realKeySet)) { return null; } else if(realKeySet.size() > 1) { LogUtils.warn(LOG, "lock key 萬用字元存在多個", new Object[]{"key", key}); return null; } else { String realKey = (String)realKeySet.iterator().next(); JedisUtil.getInstance("master").del(realKey, 1); } } catch (Exception var5) { LogUtils.error(LOG, "解鎖失敗", var5, new Object[0]); } LogUtils.info(LOG, "解鎖成功", new Object[]{"key", key}); return Boolean.valueOf(true); } }
測試類:
public class Test { public static void main(String[] args) { //獲取cpu的核心數 int count = Runtime.getRuntime().availableProcessors(); //建立執行緒池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(count, count * 10, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(40960), new ThreadPoolExecutor.AbortPolicy()); //多執行緒執行業務 threadPool.execute(()->{ String bussiness = "測試分散式鎖"; String key = "TestLock:" + 123; //嘗試獲取分散式鎖 if (LockRedisService.tryLock("測試系統", bussiness, key)) { System.out.println("---獲取分散式鎖成功---"); try { //可能會丟擲異常 System.out.println("執行邏輯"); }catch (Exception e){ System.out.println("執行邏輯錯誤: "+e); }finally { //執行完畢後解鎖 LockRedisService.unlock("測試系統", bussiness, key); } }else { System.out.println("獲取分散式鎖失敗"); } }); } }