服務介面的流量控制策略之RateLimit
一、場景描述
很多做服務介面的人或多或少的遇到這樣的場景,由於業務應用系統的負載能力有限,為了防止非預期的請求對系統壓力過大而拖垮業務應用系統。
也就是面對大流量時,如何進行流量控制?
服務介面的流量控制策略:分流、降級、限流等。本文討論下限流策略,雖然降低了服務介面的訪問頻率和併發量,卻換取服務介面和業務應用系統的高可用。
實際場景中常用的限流策略:
Nginx前端限流
按照一定的規則如帳號、IP、系統呼叫邏輯等在Nginx層面做限流
業務應用系統限流
1、客戶端限流
2、服務端限流
資料庫限流
紅線區,力保資料庫
二、常用的限流演算法
常用的限流演算法由:漏桶演算法和令牌桶演算法。本文不具體的詳細說明兩種演算法的原理,原理會在接下來的文章中做說明。
1、漏桶演算法
漏桶(Leaky Bucket)演算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水(介面有響應速率),當水流入速度過大會直接溢位(訪問頻率超過介面響應速率),然後就拒絕請求,可以看出漏桶演算法能強行限制資料的傳輸速率.示意圖如下:
可見這裡有兩個變數,一個是桶的大小,支援流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。
因為漏桶的漏出速率是固定的引數,所以,即使網路中不存在資源衝突(沒有發生擁塞),漏桶演算法也不能使流突發(burst)到埠速率.因此,漏桶演算法對於存在突發特性的流量來說缺乏效率.
2、令牌桶演算法
令牌桶演算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的演算法,更加容易理解.隨著時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶裡加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.
令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種演算法則實時的計算應該增加的令牌的數量.
三、基於Redis功能的實現
簡陋的設計思路:假設一個使用者(用IP判斷)每分鐘訪問某一個服務介面的次數不能超過10次,那麼我們可以在Redis中建立一個鍵,並此時我們就設定鍵的過期時間為60秒,每一個使用者對此服務介面的訪問就把鍵值加1,在60秒內當鍵值增加到10的時候,就禁止訪問服務介面。在某種場景中新增訪問時間間隔還是很有必要的。
1)使用Redis的incr命令,將計數器作為Lua指令碼
local current current = redis.call("incr",KEYS[1]) if tonumber(current) == 1 then redis.call("expire",KEYS[1],1) end
Lua指令碼在Redis中執行,保證了incr和expire兩個操作的原子性。
2)使用Reids的列表結構代替incr命令
FUNCTION LIMIT_API_CALL(ip) current = LLEN(ip) IF current > 10 THEN ERROR "too many requests per second" ELSE IF EXISTS(ip) == FALSE MULTI RPUSH(ip,ip) EXPIRE(ip,1) EXEC ELSE RPUSHX(ip,ip) END PERFORM_API_CALL() END
Rate Limit使用Redis的列表作為容器,LLEN用於對訪問次數的檢查,一個事物中包含了RPUSH和EXPIRE兩個命令,用於在第一次執行計數是建立列表並設定過期時間,
RPUSHX在後續的計數操作中進行增加操作。
四、基於令牌桶演算法的實現
令牌桶演算法可以很好的支撐突然額流量的變化即滿令牌桶數的峰值。
import java.io.BufferedWriter; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; import com.netease.datastream.util.framework.LifeCycle; 20 public class TokenBucket implements LifeCycle { // 預設桶大小個數 即最大瞬間流量是64M private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64; // 一個桶的單位是1位元組 private int everyTokenSize = 1; // 瞬間最大流量 private int maxFlowRate; // 平均流量 private int avgFlowRate; // 佇列來快取桶數量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64 private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE); private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private volatile boolean isStart = false; private ReentrantLock lock = new ReentrantLock(true); private static final byte A_CHAR = 'a'; public TokenBucket() { } public TokenBucket(int maxFlowRate, int avgFlowRate) { this.maxFlowRate = maxFlowRate; this.avgFlowRate = avgFlowRate; } public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) { this.everyTokenSize = everyTokenSize; this.maxFlowRate = maxFlowRate; this.avgFlowRate = avgFlowRate; } public void addTokens(Integer tokenNum) { // 若是桶已經滿了,就不再家如新的令牌 for (int i = 0; i < tokenNum; i++) { tokenQueue.offer(Byte.valueOf(A_CHAR)); } } public TokenBucket build() { start(); return this; } /** * 獲取足夠的令牌個數 * * @return */ public boolean getTokens(byte[] dataSize) { Preconditions.checkNotNull(dataSize); Preconditions.checkArgument(isStart, "please invoke start method first !"); int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內容大小對應的桶個數 final ReentrantLock lock = this.lock; lock.lock(); try { boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量 if (!result) { return false; } int tokenCount = 0; for (int i = 0; i < needTokenNum; i++) { Byte poll = tokenQueue.poll(); if (poll != null) { tokenCount++; } } return tokenCount == needTokenNum; } finally { lock.unlock(); } } @Override public void start() { // 初始化桶佇列大小 if (maxFlowRate != 0) { tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate); } // 初始化令牌生產者 TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this); scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS); isStart = true; } @Override public void stop() { isStart = false; scheduledExecutorService.shutdown(); } @Override public boolean isStarted() { return isStart; } class TokenProducer implements Runnable { private int avgFlowRate; private TokenBucket tokenBucket; public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) { this.avgFlowRate = avgFlowRate; this.tokenBucket = tokenBucket; } @Override public void run() { tokenBucket.addTokens(avgFlowRate); } } public static TokenBucket newBuilder() { return new TokenBucket(); } public TokenBucket everyTokenSize(int everyTokenSize) { this.everyTokenSize = everyTokenSize; return this; } public TokenBucket maxFlowRate(int maxFlowRate) { this.maxFlowRate = maxFlowRate; return this; } public TokenBucket avgFlowRate(int avgFlowRate) { this.avgFlowRate = avgFlowRate; return this; } private String stringCopy(String data, int copyNum) { StringBuilder sbuilder = new StringBuilder(data.length() * copyNum); for (int i = 0; i < copyNum; i++) { sbuilder.append(data); } return sbuilder.toString(); } public static void main(String[] args) throws IOException, InterruptedException { tokenTest(); } private static void arrayTest() { ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10); tokenQueue.offer(1); tokenQueue.offer(1); tokenQueue.offer(1); System.out.println(tokenQueue.size()); System.out.println(tokenQueue.remainingCapacity()); } private static void tokenTest() throws InterruptedException, IOException { TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build(); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test"))); String data = "xxxx";// 四個位元組 for (int i = 1; i <= 1000; i++) { Random random = new Random(); int i1 = random.nextInt(100); boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes()); TimeUnit.MILLISECONDS.sleep(100); if (tokens) { bufferedWriter.write("token pass --- index:" + i1); System.out.println("token pass --- index:" + i1); } else { bufferedWriter.write("token rejuect --- index" + i1); System.out.println("token rejuect --- index" + i1); } bufferedWriter.newLine(); bufferedWriter.flush(); } bufferedWriter.close(); } }
五、示例
RateLimiter 使用Demo
package ratelimite; import com.google.common.util.concurrent.RateLimiter; public class RateLimiterDemo { public static void main(String[] args) { testNoRateLimiter(); testWithRateLimiter(); } public static void testNoRateLimiter() { Long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { System.out.println("call execute.." + i); } Long end = System.currentTimeMillis(); System.out.println(end - start); } public static void testWithRateLimiter() { Long start = System.currentTimeMillis(); RateLimiter limiter = RateLimiter.create(10.0); // 每秒不超過10個任務被提交 for (int i = 0; i < 10; i++) { limiter.acquire(); // 請求RateLimiter, 超過permits會被阻塞 System.out.println("call execute.." + i); } Long end = System.currentTimeMillis(); System.out.println(end - start); } }
五、 Guava併發:ListenableFuture與RateLimiter示例
概念
ListenableFuture顧名思義就是可以監聽的Future,它是對Java原生Future的擴充套件增強。我們知道Future表示一個非同步計算任務,當任務完成時可以得到計算結果。如果我們希望一旦計算完成就拿到結果展示給使用者或者做另外的計算,就必須使用另一個執行緒不斷的查詢計算狀態。這樣做,程式碼複雜,而且效率低下。使用ListenableFuture Guava幫我們檢測Future是否完成了,如果完成就自動呼叫回撥函式,這樣可以減少併發程式的複雜度。
推薦使用第二種方法,因為第二種方法可以直接得到Future的返回值,或者處理錯誤情況。本質上第二種方法是通過調動第一種方法實現的,做了進一步的封裝。
另外ListenableFuture還有其他幾種內建實現:
SettableFuture:不需要實現一個方法來計算返回值,而只需要返回一個固定值來做為返回值,可以通過程式設定此Future的返回值或者異常資訊
CheckedFuture: 這是一個繼承自ListenableFuture介面,他提供了checkedGet()方法,此方法在Future執行發生異常時,可以丟擲指定型別的異常。
示例
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.RateLimiter; public class ListenableFutureDemo { public static void main(String[] args) { testRateLimiter(); testListenableFuture(); } /** * RateLimiter類似於JDK的訊號量Semphore,他用來限制對資源併發訪問的執行緒數 */ public static void testRateLimiter() { ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超過4個任務被提交 for (int i = 0; i < 10; i++) { limiter.acquire(); // 請求RateLimiter, 超過permits會被阻塞 final ListenableFuture<Integer> listenableFuture = executorService .submit(new Task("is "+ i)); } } public static void testListenableFuture() { ListeningExecutorService executorService = MoreExecutors .listeningDecorator(Executors.newCachedThreadPool()); final ListenableFuture<Integer> listenableFuture = executorService.submit(new Task("testListenableFuture")); //同步獲取呼叫結果 try { System.out.println(listenableFuture.get()); } catch (InterruptedException e1) { e1.printStackTrace(); } catch (ExecutionException e1) { e1.printStackTrace(); } //第一種方式 listenableFuture.addListener(new Runnable() { @Override public void run() { try { System.out.println("get listenable future's result " + listenableFuture.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }, executorService); //第二種方式 Futures.addCallback(listenableFuture, new FutureCallback<Integer>() { @Override public void onSuccess(Integer result) { System.out .println("get listenable future's result with callback " + result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); } }); } } class Task implements Callable<Integer> { String str; public Task(String str){ this.str = str; } @Override public Integer call() throws Exception { System.out.println("call execute.." + str); TimeUnit.SECONDS.sleep(1); return 7; } }
guava版本
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>14.0.1</version> </dependency>
轉載自:https://blog.csdn.net/lovelichao12/article/details/73973929
原文釋出時間為:2018-08-02
本文作者: 背丶影