Storm Tick 元組
在某些情況下,Bolt 在執行某些操作之前需要將資料快取幾秒鐘,例如每隔5秒清理一次快取或在單個請求中將一批記錄插入資料庫。
Tick 元組是系統生成的(Storm生成的)元組,我們可以在每個 Bolt 級別配置它們。我們可以在編寫 Bolt 時在程式碼中配置 Tick 元組。
我們需要在 Bolt 中覆蓋以下方法以啟用 Tick 元組:
@Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); int tickFrequencyInSeconds = 10; conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds); return conf; }
在上面的程式碼中,我們將 Tick 元組配置為10秒。現在,Storm 會每10秒鐘生成一個 Tick 元組。
接下來建立 isTickTuple 方法來確定我們收到的元組是 Tick 元組還是正常元組:
protected static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); }
Tick 元組會與你正在處理的其他正常元組混合在一起,所以需要我們判斷元組的型別。
最後,在 Bolt 的 execute 方法中新增如下程式碼來判斷元組的型別進行處理:
@Override public void execute(Tuple tuple, BasicOutputCollector collector) { try { if (isTickTuple(tuple)) { // do tick tuple } else { // do normal tuple } // do your bolt stuff } catch (Exception e) { LOG.error("Bolt execute error: {}", e); collector.reportError(e); } }
現在你的 Bolt 每10秒就會收到一個 Tick 元組。
如果希望 Topology 中的每個 Bolt 都每隔一段時間做一些操作,那麼可以定義一個 Topology 全域性的 Tick,同樣是設定Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
的值:
Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);