Java使用UDP傳送資料到InfluxDB
最近在做壓測引擎相關的開發,需要將聚合資料傳送到InfluxDB儲存以便實時分析和控制QPS。
下面介紹對InfluxDB的使用。
什麼是InfluxDB
InfluxDB是一款用Go語言編寫的開源分散式時序、事件和指標資料庫,無需外部依賴。該資料庫現在主要用於儲存涉及大量的時間戳資料,如DevOps監控資料,APP metrics, loT感測器資料和實時分析資料。
InfluxDB特徵:
- 無結構(無模式):可以是任意數量的列(tags)。
- 可以設定metric的儲存時間。
- 支援與時間有關的相關函式(如min、max、sum、count、mean、median等),方便統計。
- 支援儲存策略:可以用於資料的刪改(influxDB沒有提供資料的刪除與修改方法)。
- 支援連續查詢:是資料庫中自動定時啟動的一組語句,和儲存策略搭配可以降低InfluxDB的系統佔用量。
- 原生的HTTP支援,內建HTTP API。
- 支援類似SQL語法。
- 支援設定資料在叢集中的副本數。
- 支援定期取樣資料,寫入另外的measurement,方便分粒度儲存資料。
- 自帶web管理介面,方便使用(登入方式:http://< InfluxDB-IP >:8083)。
- 支援Grafana畫圖展示。
PS:有了InfluxDB+Grafana後,你就可以寫一些簡單的程式了,可以只負責寫後端邏輯部分,資料都可以存入InfluxDB,然後通過Grafana展示出來。
Mac安裝InfluxDB
# 安裝 brew install influxdb # 啟動 influxd -config /usr/local/etc/influxdb.conf # 檢視influxdb執行配置 influxd config # 啟動客戶端 influx -precision rfc3339
InfluxDB開啟UDP配置
vim /usr/local/etc/influxdb.conf
開啟udp配置,其他為預設值
[[udp]] enabled = true
udp配置含義:
[[udp]] – udp配置 enabled:是否啟用該模組,預設值:false。 bind-address:繫結地址,預設值:”:8089″。 database:資料庫名稱,預設值:”udp”。 retention-policy:儲存策略,無預設值。 batch-size:預設值:5000。 batch-pending:預設值:10。 read-buffer:udp讀取buffer的大小,0表示使用作業系統提供的值,如果超過作業系統的預設配置則會出錯。 該配置的預設值:0。 batch-timeout:超時時間,預設值:”1s”。 precision:時間精度,無預設值。
Java傳送UDP資料報
我們知道InfluxDB是支援Http的,為什麼我們還要採用UDP方式傳送資料呢?
基於下列原因:
- TCP資料傳輸慢,UDP資料傳輸快。
- 網路頻寬需求較小,而實時性要求高。
- InfluxDB和伺服器在同機房,發生資料丟包的可能性較小,即使真的發生丟包,對整個請求流量的收集影響也較小。
我們採用了worker執行緒呼叫addMetric
方法將資料儲存到快取map
中,send執行緒池來進行每個指定時間傳送資料到Influxdb。
程式碼如下(也可參考Jmeter
的UdpMetricsSender
類):
@Slf4j public class InfluxDBClient implements Runnable { private String measurement = "example"; private final Object lock = new Object(); private InetAddress hostAddress; private int udpPort; private volatile Map<String, List<Response>> metrics = new HashMap<>(); private long time; private String transaction; public InfluxDBClient(String influxdbUrl, String transaction) { this.transaction = transaction; try { log.debug("Setting up with url:{}", influxdbUrl); String[] urlComponents = influxdbUrl.split(":"); if (urlComponents.length == 2) { hostAddress = InetAddress.getByName(urlComponents[0]); udpPort = Integer.parseInt(urlComponents[1]); } else { throw new IllegalArgumentException("InfluxDBClient url '" + influxdbUrl + "' is wrong. The format shoule be <host/ip>:<port>"); } } catch (Exception e) { throw new IllegalArgumentException("InfluxDBClient url '" + influxdbUrl + "' is wrong. The format shoule be <host/ip>:<port>", e); } } public void addMetric(Response response) { synchronized (lock) { if (metrics.containsKey(response.getLabel())) { metrics.get(response.getLabel()).add(response); } else { metrics.put(response.getLabel(), new ArrayList<>(Collections.singletonList(response))); } } } @Override public void run() { sendMetrics(); } private void sendMetrics() { Map<String, List<Response>> tempMetrics; //複製資料到tempMetrics,清空原來metrics並初始化上次的大小 synchronized (lock) { if (isEmpty(metrics)) { return; } time = System.currentTimeMillis(); tempMetrics = metrics; metrics = new HashMap<>(); for (Map.Entry<String, List<Response>> entry : tempMetrics.entrySet()) { metrics.put(entry.getKey(), new ArrayList<>(entry.getValue().size())); } } final Map<String, List<Response>> copyMetrics = tempMetrics; final List<MetricTuple> aggregateMetrics = aggregate(copyMetrics); StringBuilder sb = new StringBuilder(aggregateMetrics.size() * 200); //傳送tempMetrics,生成一行資料,然後換行 for (MetricTuple metric : aggregateMetrics) { sb.append(metric.getMeasurement()).append(metric.getTag()).append(" ") .append(metric.getField()).append(" ").append(metric.getTimestamp() + "000000").append("\n"); } //udp傳送資料到Influxdb try (DatagramSocket ds = new DatagramSocket()) { byte[] buf = sb.toString().getBytes(); DatagramPacket dp = new DatagramPacket(buf, buf.length, this.hostAddress, this.udpPort); ds.send(dp); log.debug("send {} to influxdb", sb.toString()); } catch (SocketException e) { log.error("Cannot open udp port!", e); } catch (IOException e) { log.error("Error in transferring udp package", e); } } /** * 得到聚合資料 * * @param metrics * @return */ private List<MetricTuple> aggregate(Map<String, List<Response>> metrics) { } public boolean isEmpty(Map<String, List<Response>> map) { for (Map.Entry<String, List<Response>> entry : map.entrySet()) { if (!entry.getValue().isEmpty()) { return false; } } return true; } }
參考文件:
- ofollow,noindex" target="_blank">InfluxDB中文文件
- 玩轉時序資料庫InfluxDB