分散式限流大作戰
開篇
一週一篇技術博文又來了,這周我們講點什麼呢?看標題就知道了,那就是分散式下的限流策略(實在不知道寫些什麼好呢),至於限流的用處,好處,和處理場景就不這裡贅述了(Google全是)。ok,凌雲小課堂正式開始啦,今天要介紹的主要是三種限流策略
1. 計數限流 2. 漏桶限流 3. 令牌桶限流
基本上述三種限流策略可以涵蓋大多數需要限流的場景。為了和廣大的技術博主不一樣(555),我們今天只講分散式下的限流實現,至於什麼是漏桶和令牌桶Google嘍。。。
計數限流(時間窗)
計數限流主要有兩種,一個是計數器限流,簡單粗暴不值一提,另一個就是本文要講的滑動時間窗限流法,我們粗暴的貼程式碼。。。
/** * 簡單時間窗限流策略: * 每X秒允許行為發生Y次 * * @param key * @param period * @param maxCount * @return */ public boolean isActionAllowed(String key, int period, int maxCount) { ShardedJedis jedis = pool.getResource(); long nowTs = System.currentTimeMillis(); try { ShardedJedisPipeline pipe = jedis.pipelined(); // 移除時間窗之前所有集合 pipe.zremrangeByScore(key, 0f, new Double(nowTs - period * 1000)); // 獲取視窗內的行為數量 Response<Long> count = pipe.zcard(key); pipe.sync(); // 是否超過限制 if (count.get() <= maxCount) { // 記錄行為 pipe.zadd(key, nowTs, "" + nowTs); // 設定zset過期時間,避免冷使用者持續佔用記憶體,過期時間等於時間視窗,再多寬限 1s pipe.expire(key, period + 1); pipe.sync(); return true; } } catch (Exception e) { log.error("redis timeLimit:Error", e); } finally { jedis.close(); } return false; }
這個限流策略是基於Redis實現的。主要精髓就是利用zset的score數學特性,將時間戳存於score中。處理邏輯如下
1.每次進件先移除時間窗(給定的時間週期)外的所有key 2.對比當前zset中key的總量與MaxCount確定是否放棄本次進件 3.如果同意zset中新增value 4.設定過期時間
這個實現方式在併發量不大的應用中是完全可以應付的,但是一旦併發量過大在第2步和第3步之間因為不是原子操作,所以可能出現key的總量突破最大限流數。所以一定要用在合適的場景中。為什麼不用lua實現原子操作呢?因為ShardedJedis不支援。。。默默哭泣中。
漏桶限流-令牌桶限流
漏桶和令牌桶是一對孿生兄弟,它兩都是流量整形,限流中的常用演算法,唯一區別呢就是令牌桶是允許突發流量的而漏桶嚴格限制流量速率。他們的實現甚至可能一摸一樣。。。
我們繼續暴力貼程式碼。。555
/** * 漏斗限流 * * @author Lingyun * @Date 2018-12-08 21:48 */ public class FunnelRateLimiter { private Map<String, Funnel> funnels = new HashMap<>(); public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) { String key = String.format("%s:%s", userId, actionKey); Funnel funnel = funnels.get(key); if (funnel == null) { funnel = new Funnel(capacity, leakingRate); funnels.put(key, funnel); } return funnel.watering(1); // 需要1個quota } static class Funnel { /** * 漏斗容量 */ int capacity; /** * 漏嘴流水速率 */ float leakingRate; /** * 漏斗剩餘空間 */ int leftQuota; /** * 上一次進水時間 */ long leakingTs; public Funnel(int capacity, float leakingRate) { this.capacity = capacity; this.leakingRate = leakingRate; this.leftQuota = capacity; this.leakingTs = System.currentTimeMillis(); } /** * 空間修正 */ void makeSpace() { long nowTs = System.currentTimeMillis(); long deltaTs = nowTs - leakingTs;//距離上次進水時間差 int deltaQuota = (int) (deltaTs * leakingRate);//可騰出空間 if (deltaQuota < 0) { // 間隔時間過長,整數數字過大溢位 this.leftQuota = capacity; this.leakingTs = nowTs; return; } if (deltaQuota < 1) { // 可騰出空間過小,最小單位是1 return; } this.leftQuota += deltaQuota; this.leakingTs = nowTs; //判斷剩餘空間是否超過總容量 if (this.leftQuota > this.capacity) { this.leftQuota = this.capacity; } } boolean watering(int quota) { makeSpace(); if (this.leftQuota >= quota) {//判斷剩餘空間是否足夠 this.leftQuota -= quota; return true; } return false; } } }
以上就是漏桶限流的java實現,至於令牌桶限流程式碼,Google的Guava中有該演算法的實現(RateLimiter)使用還是非常簡單的(意思就是自行Google)。
說好的分散式呢。。。
漏桶演算法的分散式實現思路
將Funnel 物件的內容按欄位儲存到一個 hash 結構中,進水的時候將 hash 結構的欄位取出來進行邏輯運算後,再將新值回填到 hash 結構中就完成了一次行為頻度的檢測。
但是有個問題,我們無法保證整個過程的原子性。從 hash 結構中取值,然後在記憶體裡運算,再回填到 hash 結構,這三個過程無法原子化,意味著需要進行適當的加鎖控制。而一旦加鎖,就意味著會有加鎖失敗,加鎖失敗就需要選擇重試或者放棄。
如果重試的話,就會導致效能下降。如果放棄的話,就會影響使用者體驗。同時,程式碼的複雜度也跟著升高很多。這真是個艱難的選擇,我們該如何解決這個問題呢?救星來了!我們可以使用Redis-Cell,可惜的是Redis-Cell是一個模組,而模組特效是Redis4.0中帶來的。。。而公司的Redis還是可憐的2.X版本。。。
令牌桶分散式實現
我們公司使用的介面限流策略就是令牌桶限流。實現方式是Guava的RateLimit+zookeeper,zookeeper中儲存了閘道器中心的服務數量,各個服務均分介面流量,並對閘道器中心最高QPS也在每個伺服器上做均分。這樣的好處是限流令牌在本地儲存,不通過網路傳輸,每臺伺服器絕對的均分限流,當叢集中有服務下線也不會影響其他伺服器的限流閾值,不會因為一臺服務的下線流量集中導致其他服務的限流閾值上升,繼而出現雪崩。缺點就是不靈活,在高併發時負載不均衡的情況肯定會出現,但是因為每臺伺服器的限流閾值是一個定值,這就導致某些壓力較大的服務不能靈活的根據整個叢集的限流情況,調整限流閾值,只能拒絕服務。
暴力貼程式碼又來了。。。
/** * 限流校驗 * * @param key 限流快取物件key * @return 校驗通過返回true 反之返回false */ private boolean checkRateLimit(String key, Integer limiterConcurrency, Integer limiterTimeOut) { boolean valid = true; try { long startTime = System.currentTimeMillis(); int tmpCount = serverCount == 0 ? DEFAUL_SERVER_COUNT : serverCount; double permitsPerSecond = BigDecimal.valueOf(limiterConcurrency).divide(BigDecimal.valueOf(tmpCount), 2, RoundingMode.FLOOR).doubleValue(); RateLimiter rateLimiter = getRateLimiter(key, permitsPerSecond); if (limiterTimeOut == null || limiterTimeOut == 0) { valid = rateLimiter.tryAcquire(); } else { valid = rateLimiter.tryAcquire(limiterTimeOut, TimeUnit.MILLISECONDS); } long endTime = System.currentTimeMillis(); LOGGER.info(String.format("api介面限流校驗,key=%s,valid=%s,serverCount=%s,耗時=%s", key, valid, tmpCount, endTime - startTime)); } catch (Throwable e) { LOGGER.error(String.format("api介面限流校驗異常,key=%s", key), e); } return valid; } /** * 獲取限流物件 * * @param key限流物件快取KEY * @param permitsPerSecond * @return */ private RateLimiter getRateLimiter(String key, double permitsPerSecond) { RateLimiter rateLimiter = rateLimiterMap.get(key); if (rateLimiter == null) { synchronized (this) { rateLimiter = rateLimiterMap.get(key); if (rateLimiter == null) { rateLimiter = RateLimiter.create(permitsPerSecond); rateLimiterMap.put(key, rateLimiter); return rateLimiter; } } } double diff = permitsPerSecond - rateLimiter.getRate(); if (diff >= 0.01 || diff < 0) { LOGGER.info(String.format("api介面限流-併發量更新,key=%s,permitsPerSecond=%s", key, permitsPerSecond)); rateLimiter.setRate(permitsPerSecond); } return rateLimiter; }
又要快樂的結束了.....
寫到這裡時,分散式限流終於要結束了,開開心心的看了看(>﹏<)左耳朵耗子的限流設計博文,嗯,我真是渣.....