網路分析利器wireshark命令版(4):tshark結合ES
tshark
是網路分析工具wireshark
下的一個工具,主要用於命令列環境進行抓包、分析,尤其對協議深層解析時,tcpdump
難以勝任的場景中。本系列文章將整理介紹tshark
相關內容。本文將介紹與tshark
相關的流量解決方案。
利用tshark,不僅可以對現有的pcap檔案進行分析,由於可以輸出其他格式,也就可以結合ES的強大搜索能力,達到對資料報文進行記錄、分析處理的能力,可以實現回溯分析,結合kibana視覺化工具,甚至達到實時視覺化監控。
tshark + elastic stack
elastic stack全家桶
效能一直被詬病,後來另起爐灶,針對採集使用golang
構建出一套beats,用於不同的採集場景。其中針對網路流量,開發出packetbeat
。
packetbeat的優勢是定製了elasticsearch
的mapping
、kibana
一系列視覺化圖表,可以滿足一般對tcp、dns、udp等常規報文的分析。基本達到開箱即用程度。
但packetbeat也有不足,對報文的分析採用會話分析,沒有一個個報文單獨分析,傾向於應用層,對網路層面分析不足(尤其是故障排查時),此外,支援的協議有限,僅常見協議與tshark
的2000多種存在明顯差距,當遇到不支援時,需要等待支援或手動寫外掛,難度極高。
離線匯入elasticsearch
tshark
支援將pcap報文分析後生成json檔案匯入elasticsearch
,同時支援elasticsearch
的批量匯入介面_bulk
的格式,命令如下:
tshark -r test_trace.pcap -T ek > test_trace.pcap.json
之後可以將json檔案通過curl匯入。
curl -s -H "Content-Type: application/x-ndjson" -XPOST "localhost:9200/foo/_bulk" --data-binary "@/Users/test-elastic/test_trace.pcap.json"
注:
-
匯入時可能存在匯入失敗,由於
_bulk
介面對post的檔案大小有限制,儘量不要超過15MB,最好在10MB以內。如果超過,建議使用tshark的輸出條件生成多個json檔案,使用curl依次匯入。 -
如果匯入失敗,可以在
curl
加-v
檢視提示資訊。 -
預設索引名為類似
packets-2019-04-23
(報文記錄的日期),可以匯入後重新索引
可以使用類似以下命令檢視匯入情況:
curl 'http://127.0.0.1 :9200/packets-2019-04-23/_search/?size=10&pretty=true'
實時監控方案
主要思路
tshark tshark filebeat logstash
簡單實現
1. tshark部分
tshark -a duration:600 -i phy0.mon -t ad -t ad -lT fields -E separator=, -E quote=d-e _ws.col.Time-e wlan.fc.type -e wlan.fc.type_subtype -e radiotap.dbm_antsignal -e frame.len -e radiotap.datarate> tshark.csv
2. filebeat
簡單filebeat.yml配置檔案
filebeat.modules: - module: system syslog: enabled: false auth: enabled: true var.paths: ["/home/tshark.csv"] name: test output.logstash: hosts: ["localhost:5044"]
3. logstash
logstash.yml檔案,主要分為:
- 監聽5044埠接收beats的資料
- 對資料按照csv格式解析,欄位分割
- 對日期處理,轉換格式
- 新增時、分、秒,便於索引
- 對部分欄位轉換為數字格式
- 替換欄位
- 輸出到elasticsearch
input { beats { port => 5044 } } csv { source => "message" columns => [ "col.time","frame.type","frame.subtype","rssi","frame.size","data.rate" ] } date { match => [ "col.time", "YYYY-MM-DD HH:mm:ss.SSSSSSSSS" ] target => "@timestamp" } mutate { add_field => {"[hour]" => "%{+HH}"} add_field => {"[minute]" => "%{+mm}"} add_field => {"[second]" => "%{+ss}"} } mutate { convert => [ "rssi", "integer" ] convert => [ "frame.size", "integer" ] convert => [ "data.rate", "integer" ] convert => [ "second", "integer" ] convert => [ "minute", "integer" ] convert => [ "hour", "integer" ] } if[frame.type]=="0"{ mutate { replace => [ "frame.type", "Management" ] }} if[frame.type]=="1"{ mutate { replace => [ "frame.type", "Control" ] }} if[frame.type]=="2"{ mutate { replace => [ "frame.type", "Data" ] }} output { elasticsearch { hosts => ["http://localhost:9200"] index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" document_type => "%{[@metadata][type]}" } }
4. elasticsearch
可以預先匯入索引定義mapping
,這塊可以查elasticsearch文件
5. kibana
利用圖表、面板等進行資料視覺化,實現監控功能,這塊可根據業務需求進行發揮~
小結
使用tshak的報文解析、資料匯出功能,可以根據需求靈活處理,藉助開源的大資料工具,可以實現更貼合業務的工具,實現快速對網路分析、實時監控、故障排查、高階檢索、回溯分析、統計報表等,甚至在部分場景下可以替代商業的網路回溯分析系統。