用redisson的分散式鎖實現主從選舉(leader election)
使用者數上升,服務要叢集,如何實現主從機制,並且當主服務掛掉或停機維護時,其它任意從服務可自動變成主服務?
程式猿A:這還不簡單,用zookeeper就行了,配上Apache curator更方便,直接幫你實現好Leader Election了。
程式猿B:嗯…zookeeper又要安裝個服務,不想只為了這個主從又引入一個新東西,我們已經有redis了,能不能基於redis來實現?
答案是:yes。
思路
開始前,先說一下基本的實現思路:
- 先有一個redisson的分散式鎖RLock,名稱為:leader-lock
- 所有服務在啟動的時候都去嘗試獲取leader鎖
- 獲取鎖成功的服務為主服務
- 未獲取鎖的其它服務為從服務
- 從服務每隔幾秒鐘一直去嘗試獲取leader鎖,當主服務掛掉或停機時,其中一個從服務就會獲取到鎖變成主服務
碼起來
分散式鎖的初始化
RLock leaderLock = redissonClient.getLock(“leader-lock”);
ElectionThread
有了鎖後,我們需要一個專門的執行緒用於獲取鎖
class ElectionThread extends Thread { private boolean isMaster = false; public ElectionThread() { setName("leader-election"); } @Override public void run() { while (!stop) { try { if (isMaster) { synchronized (masterLock) { //leader鎖獲取到了,就不需要再去獲取了,進入阻塞狀態 masterLock.wait(); } } else { //所有從服務嘗試獲取leader鎖,嘗試並等待一定時間,如果未獲取成功,就一直重試 isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS); if (isMaster) { //leader鎖獲取成功,當前服務為主服務 logger.info("got leadership"); } } } catch (InterruptedException e) { } } } //判斷leader鎖是否獲取成功 public boolean isMaster() { return isMaster; } }
tryHold
有了ElectionThread,需要提供一個方法啟動它去獲取鎖
public void tryHold(String leaderName) { //分散式鎖的初始化 leaderLock = redissonClient.getLock(leaderName); //啟動獲取鎖的執行緒 electionThread.start(); }
鎖釋放
鎖獲取到了,如果要釋放怎麼釋放?ElectionThread需要加上釋放鎖的邏輯
class ElectionThread extends Thread { private boolean isMaster = false; public ElectionThread() { setName("leader-election"); } @Override public void run() { while (!stop) { try { if (isMaster) { synchronized (masterLock) { //leader鎖獲取到了,就不需要再去獲取了,進入阻塞狀態 masterLock.wait(); } } else { //所有從服務嘗試獲取leader鎖,嘗試並等待一定時間,如果未獲取成功,就一直重試 isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS); if (isMaster) { //leader鎖獲取成功,當前服務為主服務 logger.info("got leadership"); } } } catch (InterruptedException e) { } } //如果leader鎖被當前執行緒佔用,就釋放鎖 if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) { leaderLock.unlock(); } if (isMaster) { isMaster = false; } } //判斷leader鎖是否獲取成功 public boolean isMaster() { return isMaster; } }
shutdown
等等,ElectionThread是加上了釋放鎖的邏輯了,但當ElectionThread得到鎖的時候,執行緒已經阻塞了,我們需要在外部喚醒ElectionThread執行緒並跳出while迴圈
public void shutdown() { //stop設為true,ElectionThread中的while迴圈即可退出 stop = true; try { synchronized (masterLock) { //喚醒ElectionThread masterLock.notifyAll(); } //等待ElectionThread死亡 electionThread.join(); } catch (InterruptedException e) { } logger.info("shutdown and give up leadership"); }
shutdown hook
有了shutdown方法後,我們再加個shutdownHook,就可以在jvm停止時呼叫shutdown方法,leader鎖就會被釋放
public void tryHold(String leaderName) { //分散式鎖的初始化 leaderLock = redissonClient.getLock(leaderName); //啟動獲取鎖的執行緒 electionThread.start(); //shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown())); }
jvm的shutdownHook是在服務正常退出的情況下才會生效,如果服務異常退出,會怎樣?leader鎖會釋放嗎?放心,redisson有個lockWatchdogTimeout配置,這個配置會讓redisson客戶端處於正常狀態的時候,給那些不會自動釋放的鎖延長過期時間,如果服務異常了,那些不會自動釋放的鎖由於沒有延長過期時間,會被redis自動清除,所以leader鎖即使在服務異常退出的情況下,也會自動釋放。
lockWatchdogTimeout
lockWatchdogTimeout的配置可以用程式碼方式設定,也可以用配置檔案方式設定,一定要注意它的單位是毫秒(我測試的時候設成了10,結果找了半天問題),預設值是30000毫秒,即30秒。
import org.redisson.config.Config //從yaml配置檔案中讀取配置 Config config = Config.fromYAML(configFile.getInputStream()); //程式碼方式設定,如果希望主從切換更快,時間可以設定成5秒 config.setLockWatchdogTimeout(5000)
應用
看起來差不多了,現在我們提供一個方法用於判斷當前服務是否是主服務
public boolean isMaster() { return electionThread.isMaster(); }
Ok,現在可以拿來用了,首先看下定時任務的情況,定時任務一般只想在主服務上執行,這時就可以這樣寫了
scheduler.scheduleAtFixedRate(() -> { if (!isMaster()) { //do nothing return; } //do something }, 0, 60, TimeUnit.MINUTES);
初始化問題
這個判斷方法能不能在鎖狀態初始化完成之前阻塞,這樣在類似上面的定時任務裡(如1小時)做判斷時,主服務不至於因為沒初始化,然後就得等到下個小時才能執行,我們修改兩個地方。
public boolean isMaster() { //這裡加上一個初始化鎖,當沒有初始化時,阻塞當前執行緒 synchronized (initLock) { if (!isInit) { try { initLock.wait(); } catch (InterruptedException e) { } } } return electionThread.isMaster(); } class ElectionThread extends Thread { private boolean isMaster = false; public ElectionThread() { setName("leader-election"); } @Override public void run() { while (!stop) { try { if (isMaster) { synchronized (masterLock) { if (isInit) { //獲得鎖且已經初始化過就進入阻塞狀態 masterLock.wait(); } else { //獲得鎖,還未設定初始化狀態,就等待一會兒,給執行緒機會設定初始化狀態 masterLock.wait(Duration.ofSeconds(WAIT_SECONDS).toMillis()); } } } else { isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS); if (isMaster) { logger.info("got leadership"); } } } catch (InterruptedException e) { } finally { //設定執行緒初始化狀態 synchronized (initLock) { if (!isInit) { //初始化完成喚醒呼叫isMaster方法處於阻塞狀態的執行緒,並設定初始化狀態 initLock.notifyAll(); isInit = true; } } } } if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) { leaderLock.unlock(); } if (isMaster) { isMaster = false; } } public boolean isMaster() { return isMaster; } }
從服務變成主服務
還有一種情況,我們需要考慮,當主服務掛了,從服務變成主服務時,在上面的1小時定時任務已經過了執行時間,現在想在從服務變成主服務時,馬上就執行任務,要怎麼辦?我們可以加上一個從服務變成主服務的監聽器
private List<ElectionListener> listeners = new ArrayList<>(); public interface ElectionListener { void onElected(); } public void addElectionListener(ElectionListener electionListener) { if (listeners.contains(electionListener)) { return; } listeners.add(electionListener); } private void notifyElected() { for (ElectionListener listener : listeners) { listener.onElected(); } } //ElectionThread程式碼片斷 isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS); if (isMaster) { logger.info("got leadership"); notifyElected(); }
總結
大功告成,最後看一下完整版的程式碼
LeaderElection
import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class LeaderElection { private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class); private static final int WAIT_SECONDS = 1; private RedissonClient redissonClient; private RLock leaderLock; private boolean stop = false; private boolean isInit = false; private Object masterLock = new Object(); private Object initLock = new Object(); private ElectionThread electionThread = new ElectionThread(); private List<ElectionListener> listeners = new ArrayList<>(); public void tryHold(String leaderName) { leaderLock = redissonClient.getLock(leaderName); electionThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown())); } public boolean isMaster() { synchronized (initLock) { if (!isInit) { try { initLock.wait(); } catch (InterruptedException e) { } } } return electionThread.isMaster(); } public void addElectionListener(ElectionListener electionListener) { if (listeners.contains(electionListener)) { return; } listeners.add(electionListener); } public void shutdown() { stop = true; try { synchronized (masterLock) { masterLock.notifyAll(); } electionThread.join(); listeners.clear(); } catch (InterruptedException e) { } logger.info("shutdown and give up leadership"); } class ElectionThread extends Thread { private boolean isMaster = false; public ElectionThread() { setName("leader-election"); } @Override public void run() { while (!stop) { try { if (isMaster) { synchronized (masterLock) { if (isInit) { masterLock.wait(); } else { masterLock.wait(Duration.ofSeconds(WAIT_SECONDS).toMillis()); } } } else { isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS); if (isMaster) { logger.info("got leadership"); notifyElected(); } } } catch (InterruptedException e) { } finally { synchronized (initLock) { if (!isInit) { initLock.notifyAll(); isInit = true; } } } } if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) { leaderLock.unlock(); } if (isMaster) { isMaster = false; } } public boolean isMaster() { return isMaster; } } private void notifyElected() { for (ElectionListener listener : listeners) { listener.onElected(); } } public void setRedissonClient(RedissonClient redissonClient) { this.redissonClient = redissonClient; } }
ElectionListener
public interface ElectionListener { void onElected(); }
在springboot中的配置應用
@Configuration public class LeaderElectionConfig { @Value("${spring.application.name}") private String appName; @Bean(destroyMethod = "shutdown") public LeaderElection leaderElection(RedissonClient redissonClient) { LeaderElection leaderElection = new LeaderElection(); leaderElection.setRedissonClient(redissonClient); leaderElection.tryHold("leader-lock-" + appName); return leaderElection; } }