EFK接入kafka訊息佇列
1 前言
在筆者最開始維護的日誌服務中,日質量較小,沒有接入kafka。隨著業務規模擴增,日質量不斷增長,接入到日誌服務的產品線不斷增多,遇到流量高峰,寫入到es的效能就會降低,cpu打滿,隨時都有叢集宕機的風險。因此,接入訊息佇列,進行削峰填谷就迫在眉睫。本文主要介紹在EFK的基礎上如何接入kafka,並做到向前相容。
2 主要內容
- 如何搭建kafka叢集
- 原有EFK升級
3 搭建kafka叢集
3.1 搭建zookeeper叢集
主要參考文章:【zookeeper安裝指南 】
由於是要線上搭建叢集,為避免單點故障,就需要部署至少3個節點(取決於多數選舉機制)。
3.1.1 下載
進入要下載的版本的目錄,選擇.tar.gz檔案下載
3.1.2 安裝
使用tar解壓要安裝的目錄即可,以3.4.5版本為例
這裡以解壓到/home/work/common,實際安裝根據自己的想安裝的目錄修改(注意如果修改,那後邊的命令和配置檔案中的路徑都要相應修改)
tar -zxf zookeeper-3.4.5.tar.gz -C /home/work/common
3.1.3 配置
在主目錄下建立data和logs兩個目錄用於儲存資料和日誌:
cd /home/work/zookeeper-3.4.5 mkdir data mkdir logs
在conf目錄下新建zoo.cfg檔案,寫入如下配置:
tickTime=2000 dataDir=/home/work/common/zookeeper1/data dataLogDir=/home/work/common/zookeeper1/logs clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.220.128:2888:3888 server.2=192.168.222.128:2888:3888 server.3=192.168.223.128:2888:3888
在zookeeper1的data/myid配置如下:
echo '1' > data/myid
zookeeper2的data/myid配置如下:
echo '2' > data/myid
zookeeper2的data/myid配置如下:
echo '3' > data/myid
3.1.4 啟停
進入bin目錄,啟動、停止、重啟分和檢視當前節點狀態(包括叢集中是何角色)別執行:
./zkServer.sh start ./zkServer.sh stop ./zkServer.sh restart ./zkServer.sh status
zookeeper叢集搭建完成之後,根據實際情況開始部署kafka。以部署2個broker為例。
3.2 搭建kafka broker叢集
3.2.1 安裝
下載並解壓包:
curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
tar zxvf kafka_2.10-0.9.0.0.tgz
3.2.2 配置
進入kafka安裝工程根目錄編輯config/server.properties
#不同的broker對應的id不能重複 broker.id=1 delete.topic.enable=true inter.broker.protocol.version=0.10.0.1 log.message.format.version=0.10.0.1 listeners=PLAINTEXT://:9092,SSL://:9093 auto.create.topics.enable=false ssl.key.password=test ssl.keystore.location=/home/work/certificate/server-keystore.jks ssl.keystore.password=test ssl.truststore.location=/home/work/certificate/server-truststore.jks ssl.truststore.password=test num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/work/data/kafka/log num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=72 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.220.128:2181,192.168.222.128:2181,192.168.223.128:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
3.2.3 啟動kafka
進入kafka的主目錄
nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
3.2.4 連通性測試
首先建立一個topic:topic_1
sh bin/kafka-topics.sh --create --topic topic_1 --partitions 2 --replication-factor 2--zookeeper 192.168.220.128:2181
可以先檢查一下是否建立成功:
sh bin/kafka-topics.sh --list --zookeeper192.168.220.128:2181
起兩個終端,一個作為producer,一個作為consumer
生產訊息:
bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.220.128:9092,192.168.223.128:9092
消費訊息:
sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092,192.168.223.128:9092 --topic topic_1
好了,上面的調通了,萬里長征第一步就走完了。
4 EFK接入kafka向前相容
4.1 準備證書
在之前的EFK中是通過證書進行安全加固的,所以要先為接入kafka準備一下相關的證書。要確保給kafka生成的證書和給efk生成的證書是同一個根證書。關於證書的生成,筆者會寫文章專門介紹。主要包括:
- 服務端證書
- client證書
那麼作為kafka的輸入(filebeat)和輸出(logstash),都需要kafka的client證書,kafka的broker需要的是服務端證書。
需要注意的是,filebeat配置的是pem證書,kafka和logstash的kafka-input外掛用的是jks證書~~~因此,證書生成工具最好需要能夠同時生成這兩種證書。
4.2 filebeat升級
4.2.1 input日誌收集檔案
在fields中新增log_topic欄位,指定寫入的topic
fields: module: sonofelice type: debug log_topic: topic_1 language: java
4.2.2 filebeat.yml檔案
output.kafka: hosts: ["192.168.220.128:9093","192.168.223.128:9093"] topic: '%{[fields.log_topic]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000 ssl.certificate_authorities: ["/home/work/filebeat/keys/root-ca.pem"] ssl.certificate: "/home/work/filebeat/keys/kafka.crt.pem" ssl.key: "/home/work/filebeat/keys/kafka.key.pem"
4.3 logstash升級
input { kafka { bootstrap_servers => "10.100.27.199:9093,10.64.56.75:9093" group_id => "consumer-group-01" topics => ["topic_1"] consumer_threads => 5 decorate_events => false auto_offset_reset => "earliest" security_protocol => "SSL" ssl_keystore_password => "test" ssl_keystore_location => "/home/work/certificate/kafka-keystore.jks" ssl_keystore_password => "test" ssl_truststore_password => "test" ssl_truststore_location => "/home/work/cvca/certificate/truststore.jks" codec => json { charset => "UTF-8" } } }
那為了向前相容之前的filebeat日誌收集,我們在input中同時保留beats配置,最終配置如下:
input { kafka { bootstrap_servers => "10.100.27.199:9093,10.64.56.75:9093" group_id => "consumer-group-01" topics => ["topic_1"] consumer_threads => 5 decorate_events => false auto_offset_reset => "earliest" security_protocol => "SSL" ssl_keystore_password => "test" ssl_keystore_location => "/home/work/certificate/kafka-keystore.jks" ssl_keystore_password => "test" ssl_truststore_password => "test" ssl_truststore_location => "/home/work/cvca/certificate/truststore.jks" codec => json { charset => "UTF-8" } } beats { port => 5044 client_inactivity_timeout => 600 ssl => true ssl_certificate_authorities => ["/home/work/certificate/chain-ca.pem"] ssl_certificate => "/home/work/certificate/server.crt.pem" ssl_key => "/home/work/certificate/server.key.pem" ssl_verify_mode => "force_peer" } }
需要特別注意的是,對於kafka的input來說,codec並不是預設為json的,導致之前用beats能成功解析到es的欄位都無法解析成功,所以務必加上codec的配置。
至此,改造升級的點應該沒有太大的坑了,也能夠向前相容,接入端自行切換即可。