實現一個秒殺系統
之前我寫了如何實現分散式鎖和分散式限流,這次我們繼續在這塊功能上推進,實現一個秒殺系統,採用spring boot 2.x + mybatis+ redis + swagger2 + lombok實現。
先說說基本流程,就是提供一個秒殺介面,然後針對秒殺介面進行限流,限流的方式目前我實現了兩種,上次實現的是累計計數方式,這次還有這個功能,並且我增加了令牌桶方式的lua指令碼進行限流。
然後不被限流的資料進來之後,加一把分散式鎖,獲取分散式鎖之後就可以對資料庫進行操作了。直接操作資料庫的方式可以,但是速度會比較慢,咱們直接通過一個初始化介面,將庫存資料放到快取中,然後對快取中的資料進行操作。寫庫的操作採用非同步方式,實現的方式就是將操作好的資料放入到佇列中,然後由另一個執行緒對佇列進行消費。當然,也可以將資料直接寫入mq中,由另一個執行緒進行消費,這樣也更穩妥。
好了,看一下專案的基本結構:
看一下入口controller類,入口類有兩個方法,一個是初始化訂單的方法,即秒殺開始的時候,秒殺接口才會有效,這個方法可以採用定時任務自動實現也可以。初始化後就可以呼叫placeOrder的方法了。在placeOrder上面有個自定義的註解DistriLimitAnno,這個是我在上篇文章寫的,用作限流使用。採用的方式目前有兩種,一種是使用計數方式限流,一種方式是令牌桶,上次使用了計數,咱們這次採用令牌桶方式實現。
package com.hqs.flashsales.controller; import com.hqs.flashsales.annotation.DistriLimitAnno; import com.hqs.flashsales.aspect.LimitAspect; import com.hqs.flashsales.lock.DistributedLock; import com.hqs.flashsales.limit.DistributedLimit; import com.hqs.flashsales.service.OrderService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; import java.util.Collections; /** * @author huangqingshi * @Date 2019-01-23 */ @Slf4j @Controller public class FlashSaleController { @Autowired OrderService orderService; @Autowired DistributedLock distributedLock; @Autowired LimitAspect limitAspect; //注意RedisTemplate用的String,String,後續所有用到的key和value都是String的 @Autowired RedisTemplate<String, String> redisTemplate; private static final String LOCK_PRE = "LOCK_ORDER"; @PostMapping("/initCatalog") @ResponseBody public String initCatalog(){ try { orderService.initCatalog(); } catch (Exception e) { log.error("error", e); } return "init is ok"; } @PostMapping("/placeOrder") @ResponseBody @DistriLimitAnno(limitKey = "limit", limit = 100, seconds = "1") public Long placeOrder(Long orderId) { Long saleOrderId = 0L; boolean locked = false; String key = LOCK_PRE + orderId; String uuid = String.valueOf(orderId); try { locked = distributedLock.distributedLock(key, uuid, "10" ); if(locked) { //直接操作資料庫 //saleOrderId = orderService.placeOrder(orderId); //操作快取 非同步操作資料庫 saleOrderId = orderService.placeOrderWithQueue(orderId); } log.info("saleOrderId:{}", saleOrderId); } catch (Exception e) { log.error(e.getMessage()); } finally { if(locked) { distributedLock.distributedUnlock(key, uuid); } } return saleOrderId; } }
令牌桶的方式比直接計數更加平滑,直接計數可能會瞬間達到最高值,令牌桶則把最高峰給削掉了,令牌桶的基本原理就是有一個桶裝著令牌,然後又一隊人排隊領取令牌,領到令牌的人就可以去做做自己想做的事情了,沒有領到令牌的人直接就走了(也可以重新排隊)。發令牌是按照一定的速度發放的,所以這樣在多人等令牌的時候,很多人是拿不到的。當桶裡邊的令牌在一定時間內領完後,則沒有令牌可領,都直接走了。如果過了一定的時間之後可以再次把令牌桶裝滿供排隊的人領。基本原理是這樣的,看一下指令碼簡單瞭解一下,裡邊有一個key和四個引數,第一個引數是獲取一個令牌桶的時間間隔,第二個引數是重新填裝令牌的時間(精確到毫秒),第三個是令牌桶的數量限制,第四個是隔多長時間重新填裝令牌桶。
-- bucket name local key = KEYS[1] -- token generate interval local intervalPerPermit = tonumber(ARGV[1]) -- grant timestamp local refillTime = tonumber(ARGV[2]) -- limit token count local limit = tonumber(ARGV[3]) -- ratelimit time period local interval = tonumber(ARGV[4]) local counter = redis.call('hgetall', key) if table.getn(counter) == 0 then -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access redis.call('hmset', key, 'lastRefillTime', refillTime, 'tokensRemaining', limit - 1) -- expire will save memory redis.call('expire', key, interval) return 1 elseif table.getn(counter) == 4 then -- if bucket exists, first we try to refill the token bucket local lastRefillTime, tokensRemaining = tonumber(counter[2]), tonumber(counter[4]) local currentTokens if refillTime > lastRefillTime then -- check if refillTime larger than lastRefillTime. -- if not, it means some other operation later than this call made the call first. -- there is no need to refill the tokens. local intervalSinceLast = refillTime - lastRefillTime if intervalSinceLast > interval then currentTokens = limit redis.call('hset', key, 'lastRefillTime', refillTime) else local grantedTokens = math.floor(intervalSinceLast / intervalPerPermit) if grantedTokens > 0 then -- ajust lastRefillTime, we want shift left the refill time. local padMillis = math.fmod(intervalSinceLast, intervalPerPermit) redis.call('hset', key, 'lastRefillTime', refillTime - padMillis) end currentTokens = math.min(grantedTokens + tokensRemaining, limit) end else -- if not, it means some other operation later than this call made the call first. -- there is no need to refill the tokens. currentTokens = tokensRemaining end assert(currentTokens >= 0) if currentTokens == 0 then -- we didn't consume any keys redis.call('hset', key, 'tokensRemaining', currentTokens) return 0 else -- we take 1 token from the bucket redis.call('hset', key, 'tokensRemaining', currentTokens - 1) return 1 end else error("Size of counter is " .. table.getn(counter) .. ", Should Be 0 or 4.") end
看一下呼叫令牌桶lua的JAVA程式碼,也比較簡單:
public Boolean distributedRateLimit(String key, String limit, String seconds) { Long id = 0L; long intervalInMills = Long.valueOf(seconds) * 1000; long limitInLong = Long.valueOf(limit); long intervalPerPermit = intervalInMills / limitInLong; //Long refillTime = System.currentTimeMillis(); //log.info("呼叫redis執行lua指令碼, {} {} {} {} {}", "ratelimit", intervalPerPermit, refillTime, //limit, intervalInMills); try { id = redisTemplate.execute(rateLimitScript, Collections.singletonList(key), String.valueOf(intervalPerPermit), String.valueOf(System.currentTimeMillis()), String.valueOf(limitInLong), String.valueOf(intervalInMills)); } catch (Exception e) { log.error("error", e); } if(id == 0L) { return false; } else { return true; } }
建立兩張簡單表,一個庫存表,一個是銷售訂單表:
CREATE TABLE `catalog` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(50) NOT NULL DEFAULT '' COMMENT '名稱', `total` int(11) NOT NULL COMMENT '庫存', `sold` int(11) NOT NULL COMMENT '已售', `version` int(11) NULL COMMENT '樂觀鎖,版本號', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `sales_order` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `cid` int(11) NOT NULL COMMENT '庫存ID', `name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名稱', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '建立時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
基本已經準備完畢,然後啟動程式,開啟swagger(http://localhost:8080/swagger-ui.html#),執行初始化方法initCatalog:
日誌裡邊會輸出初始化的記錄內容,初始化庫存為1000:
初始化執行的方法,十分簡單,寫到快取中。
@Override public void initCatalog() { Catalog catalog = new Catalog(); catalog.setName("mac"); catalog.setTotal(1000L); catalog.setSold(0L); catalogMapper.insertCatalog(catalog); log.info("catalog:{}", catalog); redisTemplate.opsForValue().set(CATALOG_TOTAL + catalog.getId(), catalog.getTotal().toString()); redisTemplate.opsForValue().set(CATALOG_SOLD + catalog.getId(), catalog.getSold().toString()); log.info("redis value:{}", redisTemplate.opsForValue().get(CATALOG_TOTAL + catalog.getId())); handleCatalog(); }
我寫了一個測試類,啟動3000個執行緒,然後去進行下單請求:
package com.hqs.flashsales; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.util.concurrent.TimeUnit; @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = FlashsalesApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class FlashSalesApplicationTests { @Autowired private TestRestTemplate testRestTemplate; @Test public void flashsaleTest() { String url = "http://localhost:8080/placeOrder"; for(int i = 0; i < 3000; i++) { try { TimeUnit.MILLISECONDS.sleep(20); new Thread(() -> { MultiValueMap<String, String> params = new LinkedMultiValueMap<>(); params.add("orderId", "1"); Long result = testRestTemplate.postForObject(url, params, Long.class); if(result != 0) { System.out.println("-------------" + result); } } ).start(); } catch (Exception e) { log.info("error:{}", e.getMessage()); } } } @Test public void contextLoads() { } }
然後開始執行測試程式碼,檢視一下測試日誌和程式日誌,均顯示賣了1000後直接顯示SOLD OUT了。分別看一下日誌和資料庫:
商品庫存catalog表和訂單明細表sales_order表,都是1000條,沒有問題。
總結:
通過採用分散式鎖和分散式限流,即可實現秒殺流程,當然分散式限流也可以用到很多地方,比如限制某些IP在多久時間訪問介面多少次,都可以的。令牌桶的限流方式使得請求可以得到更加平滑的處理,不至於瞬間把系統達到最高負載。在這其中其實還有一個小細節,就是Redis的鎖,單機情況下沒有任何問題,如果是叢集的話需要注意,一個key被hash到同一個slot的時候沒有問題,如果說擴容或者縮容的話,如果key被hash到不同的slot,程式可能會出問題。在寫程式碼的過程中還出現了一個小問題,就是寫controller的方法的時候,方法一定要宣告成public的,否則自定義的註解用不了,其他service的註解直接變為空,這個問題也是找了很久才找到。
好了程式碼地址:https://github.com/stonehqs/flashsales.git
歡迎拍磚~