Redisson分散式鎖實現
1. 基本用法
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.2</version> </dependency>
Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // cluster state scan interval in milliseconds .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config); RLock lock = redisson.getLock("anyLock"); lock.lock(); try { ... } finally { lock.unlock(); }
針對上面這段程式碼,重點看一下Redisson是如何基於Redis實現分散式鎖的
Redisson中提供的加鎖的方法有很多,但大致類似,此處只看lock()方法
更多請參見 ofollow,noindex">https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers
2. 加鎖
可以看到,呼叫getLock()方法後實際返回一個RedissonLock物件,在RedissonLock物件的lock()方法主要呼叫tryAcquire()方法
由於leaseTime == -1,於是走tryLockInnerAsync()方法,這個方法才是關鍵
首先,看一下evalWriteAsync方法的定義
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
最後兩個引數分別是keys和params
實際呼叫是這樣的:
單獨將呼叫的那一段摘出來看
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
結合上面的引數宣告,我們可以知道,這裡KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)
假設前面獲取鎖時傳的name是“abc”,假設呼叫的執行緒ID是Thread-1,假設成員變數UUID型別的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c
那麼KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此,這段指令碼的意思是
1、判斷有沒有一個叫“abc”的key
2、如果沒有,則在其下設定一個欄位為“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值為“1”的鍵值對 ,並設定它的過期時間
3、如果存在,則進一步判斷“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,則其值加1,並重新設定過期時間
4、返回“abc”的生存時間(毫秒)
這裡用的資料結構是hash,hash的結構是: key 欄位1 值1 欄位2 值2 。。。
用在鎖這個場景下,key就表示鎖的名稱,也可以理解為臨界資源,欄位就表示當前獲得鎖的執行緒
所有競爭這把鎖的執行緒都要判斷在這個key下有沒有自己執行緒的欄位,如果沒有則不能獲得鎖,如果有,則相當於重入,欄位值加1(次數)
3. 解鎖
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
我們還是假設name=abc,假設執行緒ID是Thread-1
同理,我們可以知道
KEYS[1]是getName(),即KEYS[1]=abc
KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}
ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0
ARGV[2]是生存時間
ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此,上面指令碼的意思是:
1、判斷是否存在一個叫“abc”的key
2、如果不存在,向Channel中廣播一條訊息,廣播的內容是0,並返回1
3、如果存在,進一步判斷欄位6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在
4、若欄位不存在,返回空,若欄位存在,則欄位值減1
5、若減完以後,欄位值仍大於0,則返回0
6、減完後,若欄位值小於或等於0,則廣播一條訊息,廣播內容是0,並返回1;
可以猜測,廣播0表示資源可用,即通知那些等待獲取鎖的執行緒現在可以獲得鎖了
4. 等待
以上是正常情況下獲取到鎖的情況,那麼當無法立即獲取到鎖的時候怎麼辦呢?
再回到前面獲取鎖的位置
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } //訂閱 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } //get(lockAsync(leaseTime, unit)); } protected static final LockPubSub PUBSUB = new LockPubSub(); protected RFuture<RedissonLockEntry> subscribe(long threadId) { return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) { PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); }
這裡會訂閱Channel,當資源可用時可以及時知道,並搶佔,防止無效的輪詢而浪費資源
當資源可用用的時候,迴圈去嘗試獲取鎖,由於多個執行緒同時去競爭資源,所以這裡用了訊號量,對於同一個資源只允許一個執行緒獲得鎖,其它的執行緒阻塞
5. 小結
6. 其它相關