巡風原始碼淺析
由於一些需要,和抱著學習的目的,研讀了下巡風這款相當優秀的掃描器程式碼。
https://github.com/ysrc/xunfeng
主要分析了下兩個掃描的模組,對web端沒有跟進看,當然重點也在掃描的部分。
分析的語句都以註釋的形式標註在程式碼中了,由於能力有限,分析中的不足和錯誤歡迎指出。
整體架構邏輯
檔案結構
│Config.py# 配置檔案 │README.md# 說明文件 │Run.bat# Windows啟動服務 │Run.py# webserver │Run.sh# Linux啟動服務,重新啟動前需把程序先結束掉 │ ├─aider │Aider.py# 輔助驗證指令碼 │ ├─db# 初始資料庫結構 │ ├─masscan# 內建編譯好的Masscan程式(CentOS win64適用),需要chmod+x給執行許可權(root),若無法使用請自行編譯安裝。 ├─nascan ││NAScan.py # 網路資產資訊抓取引擎 ││ │├─lib ││common.py 其他方法 ││icmp.py# ICMP傳送類 ││log.py# 日誌輸出 ││mongo.py# 資料庫連線 ││scan.py# 掃描與識別 ││start.py# 執行緒控制 ││ │└─plugin │masscan.py# 呼叫Masscan指令碼 │ ├─views ││View.py# web請求處理 ││ │├─lib ││Conn.py# 資料庫公共類 ││CreateExcel.py# 表格處理 ││Login.py# 許可權驗證 ││QueryLogic.py# 查詢語句解析 ││ │├─static #靜態資源目錄 ││ │└─templates #模板檔案目錄 │ └─vulscan │VulScan.py# 漏洞檢測引擎 │ └─vuldb # 漏洞庫目錄
Run.sh
整個程式的開始就是從 Run.sh
開始的,可以先來看下起了哪些服務
#!/bin/bash CURRENT_PATH=`dirname $0` cd $CURRENT_PATH XUNFENG_LOG=/var/log/xunfeng XUNFENG_DB=/var/lib/mongodb [ ! -d $XUNFENG_LOG ] && mkdir -p ${XUNFENG_LOG} [ ! -d $XUNFENG_DB ] && mkdir -p ${XUNFENG_DB} nohup mongod --port 65521 --dbpath=${XUNFENG_DB} --auth> ${XUNFENG_LOG}/db.log & nohup python ./Run.py > ${XUNFENG_LOG}/web.log & nohup python ./aider/Aider.py > ${XUNFENG_LOG}/aider.log & nohup python ./nascan/NAScan.py > ${XUNFENG_LOG}/scan.log & nohup python ./vulscan/VulScan.py > ${XUNFENG_LOG}/vul.log &
可以看到主要起了如下四個服務
Run.py
from views.View import app if __name__ == '__main__': #app.debug = True app.run(threaded=True, port=80,host='0.0.0.0')
webserver可以看出這個是flask起的web端,裡面主要是做一些資料的展示和修改的。由於不是掃描器的重點,這裡就不具體分析了,可以自己看下程式碼。
Aider.py
輔助驗證指令碼,一個50行左右的單檔案,使用socket完成了一個簡單的DNS log平臺。
NAScan.py
網路資產資訊抓取引擎主要是呼叫 nascan
這個模組來進行網路資產(存活主機、開發埠、服務)的掃描。
VulScan.py
漏洞檢測引擎主要是呼叫 vulscan/vuldb
中的poc進行漏洞檢測。
nascan
模組結構
─nascan │NAScan.py # 網路資產資訊抓取引擎 │ ├─lib │common.py 其他方法 │icmp.py# ICMP傳送類 │log.py# 日誌輸出 │mongo.py# 資料庫連線 │scan.py# 掃描與識別 │start.py# 執行緒控制 └─plugin masscan.py# 呼叫Masscan指令碼
從 NAScan.py
檔案入口
# coding:utf-8 # author:wolf@YSRC import thread from lib.common import * from lib.start import * if __name__ == "__main__": try: CONFIG_INI = get_config()# 讀取配置 log.write('info', None, 0, u'獲取配置成功') # 日誌記錄 STATISTICS = get_statistics()# 讀取統計資訊 MASSCAN_AC = [0] # 識別符號 masscan是否在使用 NACHANGE = [0] # 識別符號 掃描列表是否被改變 thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))# 心跳執行緒 thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC))# 失效記錄刪除執行緒 socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2)# 設定連線超時 ac_data = [] while True: now_time = time.localtime() now_hour = now_time.tm_hour now_day = now_time.tm_mday now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day) cy_day, ac_hour = CONFIG_INI['Cycle'].split('|') log.write('info', None, 0, u'掃描規則: ' + str(CONFIG_INI['Cycle'])) if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:# 判斷是否進入掃描時段 ac_data.append(now_date) NACHANGE[0] = 0 log.write('info', None, 0, u'開始掃描') s = start(CONFIG_INI) s.masscan_ac = MASSCAN_AC s.statistics = STATISTICS s.run() # 開始掃描 time.sleep(60) except Exception, e: print e
準備工作
一開始是獲取配置資訊
def get_config(): config = {} # 從mongodb中讀取`nascan`的配置,可以從navicat中看到Config集合中有`vulscan`和`nascan`的掃描配置 config_info = mongo.na_db.Config.find_one({"type": "nascan"}) for name in config_info['config']: # 對於cms識別、元件容器、動態語言、服務 的配置儲存是使用`|`進行分割儲存的 # 所以在取出之前要進行簡單的格式化然後放到配置中 if name in ['Discern_cms', 'Discern_con', 'Discern_lang', 'Discern_server']: config[name] = format_config(name, config_info['config'][name]['value']) else: config[name] = config_info['config'][name]['value'] return config
然後是進行日誌記錄
# coding:utf-8 import threading import time import sys reload(sys) sys.setdefaultencoding('utf8') mutex = threading.Lock() # 執行緒互斥鎖 def write(scan_type, host, port, info): mutex.acquire() # 上鎖,避免多個程序輸出,導致格式混亂 port = int(port) try:# 由於Run.sh中使用了nohup,所以`print`的輸出會被輸出到log檔案中 time_str = time.strftime('%X', time.localtime(time.time())) if scan_type == 'portscan': print "[%s] %s:%d open" % (time_str, host, port) elif scan_type == 'server': print "[%s] %s:%d is %s" % (time_str, host, port, str(info)) elif scan_type == 'web': print "[%s] %s:%d is web" % (time_str, host, port) print "[%s] %s:%d web info %s" % (time_str, host, port, info) elif scan_type == 'active': print "[%s] %s active" % (time_str, host) elif scan_type == 'info': print "[%s] %s" % (time_str, info) except Exception, e: print 'logerror',e pass mutex.release()
之後進行讀取統計資訊
def get_statistics(): date_ = datetime.datetime.now().strftime('%Y-%m-%d') # 獲取當日的統計資訊 now_stati = mongo.na_db.Statistics.find_one({"date": date_}) if not now_stati: # 沒有當日的資訊則返回一個初始統計資訊 now_stati = {date_: {"add": 0, "update": 0, "delete": 0}} return now_stati else: # 有則返回 return {date_: now_stati['info']}
兩個監測執行緒
之後啟動了兩個現場,分別對應不同的功能
thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))# 心跳執行緒 thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC))# 失效記錄刪除執行緒
monitor
monitor
心跳執行緒,主要用於判斷掃描配置是否發生了變化
def monitor(CONFIG_INI, STATISTICS, NACHANGE): while True: try: time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') # 記錄心跳 mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}}) if date_ not in STATISTICS: STATISTICS[date_] = {"add": 0, "update": 0, "delete": 0} # 更新統計資訊 mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True) new_config = get_config() # 獲取最新配置 # 比較配置掃描列表的base64是否相同,不同則置NACHANGE[0]為1 if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):NACHANGE[0] = 1 CONFIG_INI.clear() CONFIG_INI.update(new_config) # 更新新配置 except Exception, e: print e time.sleep(30) # 每30秒檢測一次
回到 NAScan.py
中可以看到
# 判斷是否達到了一個掃描的週期,或者心跳執行緒是否檢測到掃描列表更新 # 因為上面可以看到base64不同時會將NACHANGE[0]置於1 # 至於為什麼要傳入NACHANGE[0]這樣一個列表,而不是一個flag的int值(因為列表是引用啊! if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:
cruise
然後是 cruise
失效記錄刪除執行緒
def cruise(STATISTICS,MASSCAN_AC): while True: now_str = datetime.datetime.now() week = int(now_str.weekday()) hour = int(now_str.hour) if week >= 1 and week <= 5 and hour >= 9 and hour <= 18:# 非工作時間不刪除 try: # 獲取掃描資訊記錄 data = mongo.NA_INFO.find().sort("time", 1) for history_info in data: while True: # 如果masscan正在掃描即不進行清理 # 在後期可以看到在用masscan進行掃描的時候會置1 if MASSCAN_AC[0]: time.sleep(10) else: break ip = history_info['ip'] port = history_info['port'] try: # 檢測埠是否存活 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((ip, int(port))) sock.close() except Exception, e: time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') # 不存活則刪除改記錄 mongo.NA_INFO.remove({"ip": ip, "port": port}) log.write('info', None, 0, '%s:%s delete' % (ip, port)) # 日誌記錄 STATISTICS[date_]['delete'] += 1 del history_info["_id"] history_info['del_time'] = time_ history_info['type'] = 'delete' # 新增一條操作歷史 mongo.NA_HISTORY.insert(history_info) except: pass time.sleep(3600) # 60分鐘檢測一次
start.py
回到 NAScan.py
, 前期的一些工作已經做完了,後面就可以進入 while True
的掃描迴圈了
now_time = time.localtime() now_hour = now_time.tm_hour now_day = now_time.tm_mday now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day) # 獲取資產探測週期 cy_day, ac_hour = CONFIG_INI['Cycle'].split('|') log.write('info', None, 0, u'掃描規則: ' + str(CONFIG_INI['Cycle'])) if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:# 判斷是否進入掃描時段 ac_data.append(now_date) # 判斷是否掃描過的列表 NACHANGE[0] = 0 # 置0, log.write('info', None, 0, u'開始掃描') s = start(CONFIG_INI) s.masscan_ac = MASSCAN_AC s.statistics = STATISTICS s.run() # 開始掃描 time.sleep(60)
s = start(CONFIG_INI)
初始化了一個 start
類
class start: def __init__(self, config): # 傳入CONFIG_INI 配置,然後設定類的屬性 self.config_ini = config self.queue = Queue.Queue() self.thread = int(self.config_ini['Thread']) self.scan_list = self.config_ini['Scan_list'].split('\n') self.mode = int(self.config_ini['Masscan'].split('|')[0]) self.icmp = int(self.config_ini['Port_list'].split('|')[0]) self.white_list = self.config_ini.get('White_list', '').split('\n')
然後回來額外設定了 masscan_ac
和 statistics
兩個引用識別符號(因為要與其他執行緒共享對它的修改,相當於全域性變數
然後啟動 s.run()
開始掃描
def run(self): # 在start.py中定義的全域性變數,埠列表 global AC_PORT_LIST all_ip_list = [] for ip in self.scan_list: # 處理CIDR格式的ip, eg:192.168.0.1/24 # 就不具體跟進看了,大約40行左右,涉及一些位運算格式轉換啥的 if "/" in ip: ip = cidr.CIDR(ip) if not ip:continue # 處理 192.168.0.1-192.168.0.255 這類範圍ip ip_list = self.get_ip_list(ip) # 對於白名單ip進行移除 for white_ip in self.white_list: if white_ip in ip_list: ip_list.remove(white_ip) # 是否開始了masscan掃描,開啟了mode置為1,否則為0 if self.mode == 1: # 使用masscan掃描 # 獲取檔案路徑 self.masscan_path = self.config_ini['Masscan'].split('|')[2] # 獲取掃描速率 self.masscan_rate = self.config_ini['Masscan'].split('|')[1] # 獲取存活的ip ip_list = self.get_ac_ip(ip_list) self.masscan_ac[0] = 1 AC_PORT_LIST = self.masscan(ip_list)# 如果安裝了Masscan即使用Masscan進行全埠掃描 if not AC_PORT_LIST: continue self.masscan_ac[0] = 0 for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str)# 加入佇列 self.scan_start()# 開始掃描 else: all_ip_list.extend(ip_list) if self.mode == 0: # 不使用masscan掃描 if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list) for ip_str in all_ip_list: self.queue.put(ip_str)# 加入佇列 self.scan_start()# TCP探測模式開始掃描
探測存活ip
self.get_ac_ip()
是通過ping請求來探測主機存活,後期只對存活主機進行掃描
def get_ac_ip(self, ip_list): try: s = icmp.Nscan() ipPool = set(ip_list) return s.mPing(ipPool) except Exception, e: print 'The current user permissions unable to send icmp packets' return ip_list
跟到 s.mPing()
中
def mPing(self, ipPool): # 獲得icmp的socket Sock = self.__icmpSocket Sock.settimeout(self.timeout) # 設定icmp資料報 packet = self.__icmpPacket recvFroms = set() # 初始化一個多執行緒的icmp請求類 sendThr = SendPingThr(ipPool, packet, Sock, self.timeout) # 啟動多執行緒icmp掃描 sendThr.start() while True: try: # 獲取返回的ip地址 ac_ip = Sock.recvfrom(1024)[1][0] if ac_ip not in recvFroms: log.write("active", ac_ip, 0, None) # 新增存活ip到`recvForms` recvFroms.add(ac_ip) except Exception: pass finally: if not sendThr.isAlive(): break # 返回兩個集合的交集 return recvFroms & ipPool
SendPingThr
類
class SendPingThr(threading.Thread): def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3): threading.Thread.__init__(self) self.Sock = icmpSocket self.ipPool = ipPool self.packet = icmpPacket self.timeout = timeout self.Sock.settimeout(timeout + 1) def run(self): for ip in self.ipPool: try: self.Sock.sendto(self.packet, (ip, 0)) except socket.timeout: break except: pass time.sleep(self.timeout)
masscan掃描全埠
這樣就依次將存活的ip返回到了 start.py
中的 run()
中
# 獲取到返回的存活ip ip_list = self.get_ac_ip(ip_list) # 將masscan_ac[0]置1,表示masscan正在使用 self.masscan_ac[0] = 1 # 利用masscan進行全埠掃描 AC_PORT_LIST = self.masscan(ip_list) if not AC_PORT_LIST: continue # 將masscan_ac[0]置0 self.masscan_ac[0] = 0 for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str)# 加入佇列 self.scan_start()# 開始掃描
跟進 self.masscan()
函式
def masscan(self, ip): try: if len(ip) == 0: return sys.path.append(sys.path[0] + "/plugin") m_scan = __import__("masscan") result = m_scan.run(ip, self.masscan_path, self.masscan_rate) return result except Exception, e: print e print 'No masscan plugin detected'
跟進 m_scan.run()
import os def run(ip_list,path,rate): try: ip_file = open('target.log','w') # 將存活的ip列表寫到target.log中 ip_file.write("\n".join(ip_list)) ip_file.close() # 進行過濾一些危險字元 #(issue中也有提到,並不能完全保證後臺的安全,主要還是保證對金鑰的管理 path = str(path).translate(None, ';|&`\n') rate = str(rate).translate(None, ';|&`\n') if not os.path.exists(path):return # 用系統命令進行masscan全埠掃描 os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s"%(path,rate)) # 讀取掃描結果 result_file = open('tmp.log', 'r') result_json = result_file.readlines() result_file.close() del result_json[0] del result_json[-1] open_list = {} # 對掃描結果進行格式化處理 for res in result_json: try: ip = res.split()[3] port = res.split()[2] if ip in open_list: open_list[ip].append(port) else: open_list[ip] = [port] except:pass os.remove('target.log') os.remove('tmp.log') # 返回掃描結果 return open_list except: pass
這樣,再次回到 start.py
的 run()
中
# 用masscan進行全埠掃描 AC_PORT_LIST = self.masscan(ip_list) if not AC_PORT_LIST: continue # 將self.masscan_ac[0]置0,表示結束使用 self.masscan_ac[0] = 0 # 將掃描結果存入佇列中 for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str) # 開始掃描 self.scan_start()
scan.py
前期準備
self.scan_start()
def scan_start(self): for i in range(self.thread):# 開始掃描 t = ThreadNum(self.queue) t.setDaemon(True) t.mode = self.mode t.config_ini = self.config_ini t.statistics = self.statistics t.start() self.queue.join()
跟進 ThreadNum
類
class ThreadNum(threading.Thread): def __init__(self, queue): # 賦值掃描佇列 threading.Thread.__init__(self) self.queue = queue def run(self): while True: try: # 非阻塞模式 task_host = self.queue.get(block=False) except: break try: if self.mode: # 開啟masscan掃描則使用掃描出的存活埠 port_list = AC_PORT_LIST[task_host] else: # 否則掃描特定的埠 port_list = self.config_ini['Port_list'].split('|')[1].split('\n') _s = scan.scan(task_host, port_list) # 初始化scan _s.config_ini = self.config_ini# 提供配置資訊 _s.statistics = self.statistics# 提供統計資訊 _s.run() # 啟動 except Exception, e: print e finally: self.queue.task_done()
跟到 scan
類的 run()
方法中
def run(self): self.timeout = int(self.config_ini['Timeout']) # 獲取timeout for _port in self.port_list: self.server = '' self.banner = '' self.port = int(_port) self.scan_port()# 埠掃描 if not self.banner:continue #無banner則跳過(`NULL`表示暫未檢測出,不會continue self.server_discern()# 服務識別 if self.server == '': web_info = self.try_web()# 嘗試web訪問 if web_info: # log記錄 log.write('web', self.ip, self.port, web_info) time_ = datetime.datetime.now() # 將掃描結果存入mongodb mongo.NA_INFO.update({'ip': self.ip, 'port': self.port}, {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info, 'time': time_}})
埠掃描
先是進行了 self.scan_port()
埠掃描
def scan_port(self): try: # 進行socket連線 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.connect((self.ip, self.port)) time.sleep(0.2) except Exception, e: return try: # 獲取banner資訊 self.banner = sock.recv(1024) sock.close() # 小於等於2則置為'NULL' if len(self.banner) <= 2: self.banner = 'NULL' except Exception, e: # 異常情況也置為'NULL' self.banner = 'NULL' # 日誌記錄 log.write('portscan', self.ip, self.port, None) banner = '' hostname = self.ip2hostname(self.ip) time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') try: # 進行unicode轉換 banner = unicode(self.banner, errors='replace') if self.banner == 'NULL': banner = '' # 新增一條info資訊 mongo.NA_INFO.insert({"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_}) # 統計資訊+1 self.statistics[date_]['add'] += 1 except: if banner: # 原子操作,刪除已存在的記錄 history_info = mongo.NA_INFO.find_and_modify( query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}}, remove=True) if history_info: # 新增info記錄 mongo.NA_INFO.insert( {"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_}) # 統計資訊+1 self.statistics[date_]['update'] += 1 # 刪除原先的_id del history_info["_id"] history_info['del_time'] = time_ history_info['type'] = 'update' # 更新type和del_time之後插入一條新歷史記錄 mongo.NA_HISTORY.insert(history_info)
進行socket連線的時候,例如一些ssh之類的服務,會返回一些banner資訊
服務識別
def server_discern(self): # 先嚐試進行利用配置中的`Discern_server`進行快速匹配識別 for mark_info in self.config_ini['Discern_server']: try: name, default_port, mode, reg = mark_info if mode == 'default': # default表示用特定埠,匹配特定服務 if int(default_port) == self.port: self.server = name elif mode == 'banner': # 利用banner資訊進行正則匹配檢測 matchObj = re.search(reg, self.banner, re.I | re.M) if matchObj: self.server = name if self.server:break except: continue # 對於未檢測出服務並且埠不為80、443、8080的埠進行檢測 if not self.server and self.port not in [80,443,8080]: for mark_info in self.config_ini['Discern_server']:# 發包識別 try: name, default_port, mode, reg = mark_info if mode not in ['default','banner']: # 進行傳送特定的socket包獲取banner資訊,進行再次匹配 dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dis_sock.connect((self.ip, self.port)) mode = mode.decode('string_escape') reg = reg.decode('string_escape') dis_sock.send(mode) time.sleep(0.3) dis_recv = dis_sock.recv(1024) dis_sock.close() matchObj = re.search(reg, dis_recv, re.I | re.M) if matchObj: self.server = name break except: pass if self.server: # 對於檢測到的服務,進行log和info的記錄 log.write("server", self.ip, self.port, self.server) mongo.NA_INFO.update({"ip": self.ip, "port": self.port}, {"$set": {"server": self.server}})
config_ini['Discern_server']
中的值
config_ini['Discern_server']
中的特定socket資料包
web訪問
def try_web(self): title_str, html = '', '' try:# 進行http/https請求,獲取響應報文 if self.port == 443: # 對於443埠的使用https協議 info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout) else: info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout) html = info.read() header = info.headers except urllib2.HTTPError, e: html = e.read() header = e.headers except: return if not header: return # 對於gzip格式的響應,進行解壓gzip if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']: html_data = StringIO.StringIO(html) gz = gzip.GzipFile(fileobj=html_data) html = gz.read() try: # 格式轉碼 html_code = self.get_code(header, html).strip() if html_code and len(html_code) < 12: html = html.decode(html_code).encode('utf-8') except: pass try: # 獲取titile資訊 title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M) if title: title_str = title.group(1) except: pass try: # 將響應的http報文設定成banner資訊 web_banner = str(header) + "\r\n\r\n" + html self.banner = web_banner # 新增記錄 history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port}) if 'server' not in history_info: tag = self.get_tag() web_info = {'title': title_str, 'tag': tag} return web_info else: if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner) / 60: del history_info['_id'] history_info['del_time'] = datetime.datetime.now() mongo.NA_HISTORY.insert(history_info) tag = self.get_tag() web_info = {'title': title_str, 'tag': tag} date_ = datetime.datetime.now().strftime('%Y-%m-%d') self.statistics[date_]['update'] += 1 log.write('info', None, 0, '%s:%s update web info'%(self.ip, self.port)) return web_info except: return
get_tag
()
def get_tag(self): try: url = self.ip + ':' + str(self.port) # 對web服務進行cms、元件容器、動態語言的識別 tag = map(self.discern, ['Discern_cms', 'Discern_con', 'Discern_lang'], [url, url, url]) # 過濾掉未識別出的服務 return filter(None, tag) except Exception, e: return
discern()
def discern(self, dis_type, domain): file_tmp = {} if int(domain.split(":")[1]) == 443: # http/https處理 protocol = "https://" else: protocol = "http://" try: # http請求 req = urllib2.urlopen(protocol + domain, timeout=self.timeout) header = req.headers html = req.read() except urllib2.HTTPError, e: html = e.read() header = e.headers except Exception, e: return # 對於'Discern_cms', 'Discern_con', 'Discern_lang'在資料庫中都有自己的識別判斷方式 for mark_info in self.config_ini[dis_type]: if mark_info[1] == 'header': try: if not header: return # 通過header方式則對對應的http頭中的值進行匹配 # 如存在PHPSSIONID之類的值判定為php if re.search(mark_info[3], header[mark_info[2]], re.I): return mark_info[0] except Exception, e: continue elif mark_info[1] == 'file': if mark_info[2] == 'index': try: if not html: return # 對於file index方式利用檔案字尾,如1.php這樣判斷為php語言 if re.search(mark_info[3], html, re.I): return mark_info[0] except Exception, e: continue else: # 防止重複檢測 if mark_info[2] in file_tmp: re_html = file_tmp[mark_info[2]] else: # 訪問指定的robots.txt之類的檔案 try: re_html = urllib2.urlopen(protocol + domain + "/" + mark_info[2], timeout=self.timeout).read() except urllib2.HTTPError, e: re_html = e.read() except Exception, e: return file_tmp[mark_info[2]] = re_html try: # 檢測指定檔案中是否存在特定關鍵字 # 如robots.txt中存在'php168'則為php168cms if re.search(mark_info[3], re_html, re.I): return mark_info[0] except Exception, e: print mark_info[3]
config_ini[Discern_lang]
中的值
config_ini[Discern_cms]
中的值
最後回到 run()
中
web_info = self.try_web()# 嘗試web訪問 if web_info: # 檢測完web特徵之後,就是進行簡單的log記錄,和更新資料庫中info的值 log.write('web', self.ip, self.port, web_info) time_ = datetime.datetime.now() mongo.NA_INFO.update({'ip': self.ip, 'port': self.port}, {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info, 'time': time_}})
到這裡,scan的掃描也就結束了,回到 start
類的 run()
中,剩下的就是不使用masscan的掃描
if self.mode == 0: # 不使用masscan掃描 # 如果設定了icmp檢測,會對ip列表進行存活檢測,只掃描存活ip if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list) for ip_str in all_ip_list: self.queue.put(ip_str)# 加入佇列 self.scan_start()# TCP探測模式開始掃描
這裡的掃描過程中將ip列表改成了 all_ip_list
,其餘的掃描過程也是通過 scan_start()
來呼叫 scan
類進行掃描。
到這裡,整個 NAScan
資產掃描過程也就完成了,每次掃描完會sleep60秒,然後再次迴圈這個過程。
vulscan
用於對掃出的資產進行漏洞掃描,具體的掃描過程依賴於 vuldb
中的外掛形式進行掃描,做到可插拔的模式
json格式的外掛
轉換成json形式後就是
{ "name" : "Axis2資訊洩露", "info" : "HappyAxis.jsp 頁面存在系統敏感資訊。", "level" : "低危", "type" : "資訊洩露", "author" : "wolf@YSRC", "url": "", "keyword" : "tag:axis2", "source" : 1, "plugin" : { "url" : "/axis2/axis2-web/HappyAxis.jsp", "tag" : "敏感資訊洩露", "analyzing" : "keyword", "analyzingdata" : "Axis2 Happiness Page", "data" : "", "method" : "GET" } }
python指令碼格式的外掛
# coding:utf-8 import ftplib def get_plugin_info():# 外掛描述資訊 plugin_info = { "name": "FTP弱口令", "info": "導致敏感資訊洩露,嚴重情況可導致伺服器被入侵控制。", "level": "高危", "type": "弱口令", "author": "wolf@YSRC", "url": "", "keyword": "server:ftp",# 推薦搜尋關鍵字 } return plugin_info def check(ip, port, timeout): # 漏洞檢測程式碼 user_list = ['ftp', 'www', 'admin', 'root', 'db', 'wwwroot', 'data', 'web'] for user in user_list: for pass_ in PASSWORD_DIC:# 密碼字典無需定義,程式會自動為其賦值。 pass_ = str(pass_.replace('{user}', user)) try: ftp = ftplib.FTP() ftp.timeout = timeout ftp.connect(ip, port) ftp.login(user, pass_) if pass_ == '': pass_ = 'null' if user == 'ftp' and pass_ == 'ftp': return u"可匿名登入" return u"存在弱口令,賬號:%s,密碼:%s" % (user, pass_)# 成功返回結果,內容顯示在掃描結果頁面。 except: pass
掃描過程較資產掃描偏簡單些,一個280行左右的單檔案
一開始定義了一些全域性變數
# 新增系統路徑 sys.path.append(sys.path[0] + '/vuldb') sys.path.append(sys.path[0] + "/../") # 獲取mongodb賬號配置 from Config import ProductionConfig # 進行mongodb認證連線 db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT) na_db = getattr(db_conn, ProductionConfig.DBNAME) na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD) # 做了幾個集合的簡化操作 na_task = na_db.Task na_result = na_db.Result na_plugin = na_db.Plugin na_config = na_db.Config na_heart = na_db.Heartbeat # 執行緒鎖 lock = thread.allocate() # 一些全域性變數 PASSWORD_DIC = [] THREAD_COUNT = 50 TIMEOUT = 10 PLUGIN_DB = {} TASK_DATE_DIC = {} WHITE_LIST = []
然後開始執行流程
if __name__ == '__main__': init() # 進行init初始化操作 PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() # 獲取配置 thread.start_new_thread(monitor, ()) # 啟動監控執行緒 while True: task_id, task_plan, task_target, task_plugin = queue_get() # 任務資訊獲取 if task_id == '': time.sleep(10) continue if PLUGIN_DB: del sys.modules[PLUGIN_DB.keys()[0]] # 清理外掛快取 PLUGIN_DB.clear() for task_netloc in task_target: while True: if int(thread._count()) < THREAD_COUNT: if task_netloc[0] in WHITE_LIST: break thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin)) break else: time.sleep(2) if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}})
準備工作
init
用於獲取外掛的資訊
def init(): # 若資料庫中存在外掛資訊,則直接返回,否則重新獲取 if na_plugin.find().count() >= 1: return script_plugin = [] json_plugin = [] # 獲取vuldb中的外掛 file_list = os.listdir(sys.path[0] + '/vuldb') time_ = datetime.datetime.now() for filename in file_list: try: # 外掛分為json和py兩種格式 if filename.split('.')[1] == 'py': script_plugin.append(filename.split('.')[0]) if filename.split('.')[1] == 'json': json_plugin.append(filename) except: pass for plugin_name in script_plugin: try: # py格式的外掛直接匯入,然後讀取對於變數,插入到mongodb中 res_tmp = __import__(plugin_name) plugin_info = res_tmp.get_plugin_info() plugin_info['add_time'] = time_ plugin_info['filename'] = plugin_name plugin_info['count'] = 0 na_plugin.insert(plugin_info) except: pass for plugin_name in json_plugin: try: # json格式的外掛,用json解析後讀取對應變數,插入到mongodb中 json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read() plugin_info = json.loads(json_text) plugin_info['add_time'] = time_ plugin_info['filename'] = plugin_name plugin_info['count'] = 0 del plugin_info['plugin'] na_plugin.insert(plugin_info) except: pass
get_config
def get_config(): try: config_info = na_config.find_one({"type": "vulscan"}) pass_row = config_info['config']['Password_dic'] thread_row = config_info['config']['Thread'] timeout_row = config_info['config']['Timeout'] white_row = config_info['config']['White_list'] password_dic = pass_row['value'].split('\n') thread_count = int(thread_row['value']) timeout = int(timeout_row['value']) white_list = white_row['value'].split('\n') return password_dic, thread_count, timeout, white_list except Exception, e: print e
和之前nascan中的讀取配置類似,只是這回讀的是type為vulscan的配置
讀取弱口令、執行緒數、timeout、白名單之類的配置引數,然後返回
monitor
新起了個 monitor
監測執行緒,監測外掛的使用情況
def monitor(): # 引入全域性變數 global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST while True: # 獲取正在執行的任務 queue_count = na_task.find({"status": 0, "plan": 0}).count() if queue_count: # 如果有正在執行的任務,則置為1 load = 1 else: # 否則根據當前執行緒數,來判斷外掛是否在被使用 ac_count = thread._count() load = float(ac_count - 4) / THREAD_COUNT if load > 1: load = 1 if load < 0: load = 0 # 更新mongodb中的heatbeat集合,有外掛正在掃描 na_heart.update({"name": "load"}, {"$set": {"value": load, "up_time": datetime.datetime.now()}}) PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() # 然後根據load值進行不同時間的休眠 if load > 0: time.sleep(8) else: time.sleep(60)
然後進入到 while True
的迴圈
通過 queue_get()
進行任務引數的獲取
def queue_get(): global TASK_DATE_DIC # 獲取未載入的task,更新為啟動狀態 task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={"$set": {"status": 1}}, sort={'time': 1}) if task_req: # 如果存在,在TASK_DATE_DIC記錄task,然後返回任務資訊 TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now() return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin'] else: # 獲取 plan != 0 的task列表 task_req_row = na_task.find({"plan": {"$ne": 0}}) if task_req_row: for task_req in task_req_row: # 判斷是否需要再次啟動任務 if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']): if task_req['isupdate'] == 1: # 任務更新後,需要重新從info集合中獲取ip和port # 更新task集合的target task_req['target'] = update_target(json.loads(task_req['query'])) na_task.update({"_id": task_req['_id']}, {"$set": {"target": task_req['target']}}) # 更新task集合中的status自增1 na_task.update({"_id": task_req['_id']}, {"$inc": {"status": 1}}) # 在TASK_DATE_DIC記錄task TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now() # 返回task資訊 return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin'] return '', '', '', ''
回到 __main__
中
# 獲取任務資訊 task_id, task_plan, task_target, task_plugin = queue_get() if task_id == '': # 沒有獲取到task配置則sleep10秒後繼續獲取 time.sleep(10) continue if PLUGIN_DB: # 當有外掛快取時清理外掛快取 # 後面掃描時會匯入外掛模組,刪除之前匯入的模組 del sys.modules[PLUGIN_DB.keys()[0]] PLUGIN_DB.clear() for task_netloc in task_target: while True: # 控制執行緒數 if int(thread._count()) < THREAD_COUNT: # 剔除白名單ip if task_netloc[0] in WHITE_LIST: break # 啟動vulscan掃描執行緒 thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin)) break else: time.sleep(2) # task_plan == 0 為一次性任務 # 更新 status = 2 if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}})
vulscan
__init__
def __init__(self, task_id, task_netloc, task_plugin): self.task_id = task_id self.task_netloc = task_netloc self.task_plugin = task_plugin self.result_info = '' self.start()
設定好類變數,然後進入 start()
def start(self): self.get_plugin_info() if '.json' in self.plugin_info['filename']:# json檢測模式 try: self.load_json_plugin()# 讀取漏洞標示 self.set_request()# 標示符轉換為請求 self.poc_check()# 檢測 except Exception, e: return else:# py指令碼檢測模式 plugin_filename = self.plugin_info['filename'] self.log(str(self.task_netloc) + "call " + self.task_plugin) if task_plugin not in PLUGIN_DB: plugin_res = __import__(plugin_filename) setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC)# 給外掛宣告密碼字典 PLUGIN_DB[plugin_filename] = plugin_res try: self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]),TIMEOUT) except: return self.save_request()# 儲存結果
json格式檢測
先 load_json_plugin()
載入配置指令碼
def get_plugin_info(self): info = na_plugin.find_one({"name": self.task_plugin}) self.plugin_info = info
然後轉換為http請求,返回請求控制代碼
def set_request(self): # 構建url url = 'http://' + self.task_netloc[0] + ":" + str(self.task_netloc[1]) + self.plugin_info['plugin']['url'] if self.plugin_info['plugin']['method'] == 'GET': # 進行GET請求 request = urllib2.Request(url) else: # 否則進行post請求 request = urllib2.Request(url, self.plugin_info['plugin']['data']) self.poc_request = request
然後驗證poc是否有效
def poc_check(self): try: # 進行http請求,獲取header和body資訊 res = urllib2.urlopen(self.poc_request, timeout=30) res_html = res.read(204800) header = res.headers # res_code = res.code except urllib2.HTTPError, e: # res_code = e.code header = e.headers res_html = e.read(204800) except Exception, e: return try: # 獲取編碼,然後轉碼 html_code = self.get_code(header, res_html).strip() if html_code and len(html_code) < 12: res_html = res_html.decode(html_code).encode('utf-8') except: pass an_type = self.plugin_info['plugin']['analyzing'] vul_tag = self.plugin_info['plugin']['tag'] analyzingdata = self.plugin_info['plugin']['analyzingdata'] if an_type == 'keyword': # 如果是關鍵詞檢測,判斷是正則還是MD5的檢測,然後進行比對 if analyzingdata.encode("utf-8") in res_html: self.result_info = vul_tag elif an_type == 'regex': if re.search(analyzingdata, res_html, re.I): self.result_info = vul_tag elif an_type == 'md5': md5 = hashlib.md5() md5.update(res_html) # 比對成功,則返回外掛tag if md5.hexdigest() == analyzingdata: self.result_info = vul_tag
py指令碼檢測
# 獲取外掛檔名 plugin_filename = self.plugin_info['filename'] self.log(str(self.task_netloc) + "call " + self.task_plugin) if task_plugin not in PLUGIN_DB: # 不在PLUGIN_DB中則匯入 plugin_res = __import__(plugin_filename) setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC)# 給外掛宣告密碼字典 PLUGIN_DB[plugin_filename] = plugin_res # 新增到PLUGIN_DB中 try: # 啟用py指令碼的check方法,並設定timeout self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]),TIMEOUT) except: return
儲存請求結果
def save_request(self): # 判斷是否掃描出結果了 if self.result_info: try: time_ = datetime.datetime.now() self.log(str(self.task_netloc) + " " + self.result_info) # 沒有這條掃描記錄則外掛掃出的記錄+1 v_count = na_result.find( {"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count() if not v_count: na_plugin.update({"name": self.task_plugin}, {"$inc": {'count': 1}}) vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'], "vul_type": self.plugin_info['type']} w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1], "vul_info": vulinfo, "info": self.result_info, "time": time_, "task_date": TASK_DATE_DIC[str(self.task_id)]} # 新增掃描結果記錄 na_result.insert(w_vul) except Exception, e: pass
到此也就完成了vulscan的掃描過程。
最後
巡風中對於掃描的分工,多執行緒的處理都有很多值得學習和借鑑的地方。而且幾乎都有增加一些心跳執行緒,用於監測。
分析中難免有些不足或者錯誤,歡迎大佬們指出!