Kingkk's Blog.

巡风源码浅析

2019/02/16 Share

前言

由于一些需要,和抱着学习的目的,研读了下巡风这款相当优秀的扫描器代码。

https://github.com/ysrc/xunfeng

主要分析了下两个扫描的模块,对web端没有跟进看,当然重点也在扫描的部分。

分析的语句都以注释的形式标注在代码中了,由于能力有限,分析中的不足和错误欢迎指出。

整体架构逻辑

文件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
│  Config.py  # 配置文件
│ README.md # 说明文档
│ Run.bat # Windows启动服务
│ Run.py # webserver
│ Run.sh # Linux启动服务,重新启动前需把进程先结束掉

├─aider
│ Aider.py # 辅助验证脚本

├─db # 初始数据库结构

├─masscan # 内置编译好的Masscan程序(CentOS win64适用),需要chmod+x给执行权限(root),若无法使用请自行编译安装。
├─nascan
│ │ NAScan.py # 网络资产信息抓取引擎
│ │
│ ├─lib
│ │ common.py 其他方法
│ │ icmp.py # ICMP发送类
│ │ log.py # 日志输出
│ │ mongo.py # 数据库连接
│ │ scan.py # 扫描与识别
│ │ start.py # 线程控制
│ │
│ └─plugin
│ masscan.py # 调用Masscan脚本

├─views
│ │ View.py # web请求处理
│ │
│ ├─lib
│ │ Conn.py # 数据库公共类
│ │ CreateExcel.py # 表格处理
│ │ Login.py # 权限验证
│ │ QueryLogic.py # 查询语句解析
│ │
│ ├─static #静态资源目录
│ │
│ └─templates #模板文件目录

└─vulscan
│ VulScan.py # 漏洞检测引擎

└─vuldb # 漏洞库目录

Run.sh

整个程序的开始就是从Run.sh开始的,可以先来看下起了哪些服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/bin/bash
CURRENT_PATH=`dirname $0`
cd $CURRENT_PATH

XUNFENG_LOG=/var/log/xunfeng
XUNFENG_DB=/var/lib/mongodb

[ ! -d $XUNFENG_LOG ] && mkdir -p ${XUNFENG_LOG}
[ ! -d $XUNFENG_DB ] && mkdir -p ${XUNFENG_DB}

nohup mongod --port 65521 --dbpath=${XUNFENG_DB} --auth > ${XUNFENG_LOG}/db.log &
nohup python ./Run.py > ${XUNFENG_LOG}/web.log &
nohup python ./aider/Aider.py > ${XUNFENG_LOG}/aider.log &
nohup python ./nascan/NAScan.py > ${XUNFENG_LOG}/scan.log &
nohup python ./vulscan/VulScan.py > ${XUNFENG_LOG}/vul.log &

可以看到主要起了如下四个服务

Run.py

1
2
3
4
5
from views.View import app

if __name__ == '__main__':
#app.debug = True
app.run(threaded=True, port=80,host='0.0.0.0')

webserver可以看出这个是flask起的web端,里面主要是做一些数据的展示和修改的。由于不是扫描器的重点,这里就不具体分析了,可以自己看下代码。

Aider.py

辅助验证脚本,一个50行左右的单文件,使用socket完成了一个简单的DNS log平台。

NAScan.py

网络资产信息抓取引擎 主要是调用nascan这个模块来进行网络资产(存活主机、开发端口、服务)的扫描。

VulScan.py

漏洞检测引擎 主要是调用vulscan/vuldb中的poc进行漏洞检测。

nascan

模块结构

1
2
3
4
5
6
7
8
9
10
11
12
─nascan
│ NAScan.py # 网络资产信息抓取引擎

├─lib
│ common.py 其他方法
│ icmp.py # ICMP发送类
│ log.py # 日志输出
│ mongo.py # 数据库连接
│ scan.py # 扫描与识别
│ start.py # 线程控制
└─plugin
masscan.py # 调用Masscan脚本

NAScan.py文件入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# coding:utf-8
# author:wolf@YSRC
import thread
from lib.common import *
from lib.start import *
if __name__ == "__main__":
try:
CONFIG_INI = get_config() # 读取配置
log.write('info', None, 0, u'获取配置成功') # 日志记录
STATISTICS = get_statistics() # 读取统计信息
MASSCAN_AC = [0] # 标识符 masscan是否在使用
NACHANGE = [0] # 标识符 扫描列表是否被改变
thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE)) # 心跳线程
thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC)) # 失效记录删除线程
socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2) # 设置连接超时
ac_data = []
while True:
now_time = time.localtime()
now_hour = now_time.tm_hour
now_day = now_time.tm_mday
now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day)
cy_day, ac_hour = CONFIG_INI['Cycle'].split('|')
log.write('info', None, 0, u'扫描规则: ' + str(CONFIG_INI['Cycle']))
if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]: # 判断是否进入扫描时段
ac_data.append(now_date)
NACHANGE[0] = 0
log.write('info', None, 0, u'开始扫描')
s = start(CONFIG_INI)
s.masscan_ac = MASSCAN_AC
s.statistics = STATISTICS
s.run() # 开始扫描
time.sleep(60)
except Exception, e:
print e

准备工作

一开始是获取配置信息

1
2
3
4
5
6
7
8
9
10
11
12
def get_config():
config = {}
# 从mongodb中读取`nascan`的配置,可以从navicat中看到Config集合中有`vulscan`和`nascan`的扫描配置
config_info = mongo.na_db.Config.find_one({"type": "nascan"})
for name in config_info['config']:
# 对于cms识别、组件容器、动态语言、服务 的配置存储是使用`|`进行分割存储的
# 所以在取出之前要进行简单的格式化然后放到配置中
if name in ['Discern_cms', 'Discern_con', 'Discern_lang', 'Discern_server']:
config[name] = format_config(name, config_info['config'][name]['value'])
else:
config[name] = config_info['config'][name]['value']
return config

然后是进行日志记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# coding:utf-8
import threading
import time
import sys
reload(sys)
sys.setdefaultencoding('utf8')
mutex = threading.Lock() # 线程互斥锁
def write(scan_type, host, port, info):
mutex.acquire() # 上锁,避免多个进程输出,导致格式混乱
port = int(port)
try: # 由于Run.sh中使用了nohup,所以`print`的输出会被输出到log文件中
time_str = time.strftime('%X', time.localtime(time.time()))
if scan_type == 'portscan':
print "[%s] %s:%d open" % (time_str, host, port)
elif scan_type == 'server':
print "[%s] %s:%d is %s" % (time_str, host, port, str(info))
elif scan_type == 'web':
print "[%s] %s:%d is web" % (time_str, host, port)
print "[%s] %s:%d web info %s" % (time_str, host, port, info)
elif scan_type == 'active':
print "[%s] %s active" % (time_str, host)
elif scan_type == 'info':
print "[%s] %s" % (time_str, info)
except Exception, e:
print 'logerror',e
pass
mutex.release()

之后进行读取统计信息

1
2
3
4
5
6
7
8
9
10
11
def get_statistics():
date_ = datetime.datetime.now().strftime('%Y-%m-%d')
# 获取当日的统计信息
now_stati = mongo.na_db.Statistics.find_one({"date": date_})
if not now_stati:
# 没有当日的信息则返回一个初始统计信息
now_stati = {date_: {"add": 0, "update": 0, "delete": 0}}
return now_stati
else:
# 有则返回
return {date_: now_stati['info']}

两个监测线程

之后启动了两个现场,分别对应不同的功能

1
2
thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))  # 心跳线程
thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC)) # 失效记录删除线程

monitor

monitor心跳线程,主要用于判断扫描配置是否发生了变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def monitor(CONFIG_INI, STATISTICS, NACHANGE):
while True:
try:
time_ = datetime.datetime.now()
date_ = time_.strftime('%Y-%m-%d')
# 记录心跳
mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}})
if date_ not in STATISTICS: STATISTICS[date_] = {"add": 0, "update": 0, "delete": 0}
# 更新统计信息
mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True)
new_config = get_config() # 获取最新配置
# 比较配置扫描列表的base64是否相同,不同则置NACHANGE[0]为1
if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):NACHANGE[0] = 1
CONFIG_INI.clear()
CONFIG_INI.update(new_config) # 更新新配置
except Exception, e:
print e
time.sleep(30) # 每30秒检测一次

回到NAScan.py中可以看到

1
2
3
4
# 判断是否达到了一个扫描的周期,或者心跳线程是否检测到扫描列表更新
# 因为上面可以看到base64不同时会将NACHANGE[0]置于1
# 至于为什么要传入NACHANGE[0]这样一个列表,而不是一个flag的int值(因为列表是引用啊!
if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:

cruise

然后是cruise 失效记录删除线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def cruise(STATISTICS,MASSCAN_AC):
while True:
now_str = datetime.datetime.now()
week = int(now_str.weekday())
hour = int(now_str.hour)
if week >= 1 and week <= 5 and hour >= 9 and hour <= 18: # 非工作时间不删除
try:
# 获取扫描信息记录
data = mongo.NA_INFO.find().sort("time", 1)
for history_info in data:
while True:
# 如果masscan正在扫描即不进行清理
# 在后期可以看到在用masscan进行扫描的时候会置1
if MASSCAN_AC[0]:
time.sleep(10)
else:
break
ip = history_info['ip']
port = history_info['port']
try:
# 检测端口是否存活
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, int(port)))
sock.close()
except Exception, e:
time_ = datetime.datetime.now()
date_ = time_.strftime('%Y-%m-%d')
# 不存活则删除改记录
mongo.NA_INFO.remove({"ip": ip, "port": port})
log.write('info', None, 0, '%s:%s delete' % (ip, port)) # 日志记录
STATISTICS[date_]['delete'] += 1
del history_info["_id"]
history_info['del_time'] = time_
history_info['type'] = 'delete'
# 添加一条操作历史
mongo.NA_HISTORY.insert(history_info)
except:
pass
time.sleep(3600) # 60分钟检测一次

start.py

回到NAScan.py, 前期的一些工作已经做完了,后面就可以进入while True的扫描循环了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
now_time = time.localtime()
now_hour = now_time.tm_hour
now_day = now_time.tm_mday
now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day)
# 获取资产探测周期
cy_day, ac_hour = CONFIG_INI['Cycle'].split('|')
log.write('info', None, 0, u'扫描规则: ' + str(CONFIG_INI['Cycle']))
if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]: # 判断是否进入扫描时段
ac_data.append(now_date) # 判断是否扫描过的列表
NACHANGE[0] = 0 # 置0,
log.write('info', None, 0, u'开始扫描')
s = start(CONFIG_INI)
s.masscan_ac = MASSCAN_AC
s.statistics = STATISTICS
s.run() # 开始扫描
time.sleep(60)

s = start(CONFIG_INI)初始化了一个start

1
2
3
4
5
6
7
8
9
10
class start:
def __init__(self, config):
# 传入CONFIG_INI 配置,然后设置类的属性
self.config_ini = config
self.queue = Queue.Queue()
self.thread = int(self.config_ini['Thread'])
self.scan_list = self.config_ini['Scan_list'].split('\n')
self.mode = int(self.config_ini['Masscan'].split('|')[0])
self.icmp = int(self.config_ini['Port_list'].split('|')[0])
self.white_list = self.config_ini.get('White_list', '').split('\n')

然后回来额外设置了masscan_acstatistics两个引用标识符(因为要与其他线程共享对它的修改,相当于全局变量

然后启动s.run()开始扫描

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def run(self):
# 在start.py中定义的全局变量,端口列表
global AC_PORT_LIST
all_ip_list = []
for ip in self.scan_list:
# 处理CIDR格式的ip, eg:192.168.0.1/24
# 就不具体跟进看了,大约40行左右,涉及一些位运算格式转换啥的
if "/" in ip: ip = cidr.CIDR(ip)
if not ip:continue
# 处理 192.168.0.1-192.168.0.255 这类范围ip
ip_list = self.get_ip_list(ip)
# 对于白名单ip进行移除
for white_ip in self.white_list:
if white_ip in ip_list:
ip_list.remove(white_ip)
# 是否开始了masscan扫描,开启了mode置为1,否则为0
if self.mode == 1: # 使用masscan扫描
# 获取文件路径
self.masscan_path = self.config_ini['Masscan'].split('|')[2]
# 获取扫描速率
self.masscan_rate = self.config_ini['Masscan'].split('|')[1]
# 获取存活的ip
ip_list = self.get_ac_ip(ip_list)
self.masscan_ac[0] = 1
AC_PORT_LIST = self.masscan(ip_list) # 如果安装了Masscan即使用Masscan进行全端口扫描
if not AC_PORT_LIST: continue
self.masscan_ac[0] = 0
for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str) # 加入队列
self.scan_start() # 开始扫描
else:
all_ip_list.extend(ip_list)
if self.mode == 0: # 不使用masscan扫描
if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list)
for ip_str in all_ip_list: self.queue.put(ip_str) # 加入队列
self.scan_start() # TCP探测模式开始扫描

探测存活ip

self.get_ac_ip()是通过ping请求来探测主机存活,后期只对存活主机进行扫描

1
2
3
4
5
6
7
8
def get_ac_ip(self, ip_list):
try:
s = icmp.Nscan()
ipPool = set(ip_list)
return s.mPing(ipPool)
except Exception, e:
print 'The current user permissions unable to send icmp packets'
return ip_list

跟到s.mPing()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def mPing(self, ipPool):
# 获得icmp的socket
Sock = self.__icmpSocket
Sock.settimeout(self.timeout)
# 设置icmp数据报
packet = self.__icmpPacket
recvFroms = set()
# 初始化一个多线程的icmp请求类
sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
# 启动多线程icmp扫描
sendThr.start()
while True:
try:
# 获取返回的ip地址
ac_ip = Sock.recvfrom(1024)[1][0]
if ac_ip not in recvFroms:
log.write("active", ac_ip, 0, None)
# 添加存活ip到`recvForms`
recvFroms.add(ac_ip)
except Exception:
pass
finally:
if not sendThr.isAlive():
break
# 返回两个集合的交集
return recvFroms & ipPool

SendPingThr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class SendPingThr(threading.Thread):
def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3):
threading.Thread.__init__(self)
self.Sock = icmpSocket
self.ipPool = ipPool
self.packet = icmpPacket
self.timeout = timeout
self.Sock.settimeout(timeout + 1)

def run(self):
for ip in self.ipPool:
try:
self.Sock.sendto(self.packet, (ip, 0))
except socket.timeout:
break
except:
pass
time.sleep(self.timeout)

masscan扫描全端口

这样就依次将存活的ip返回到了start.py中的run()

1
2
3
4
5
6
7
8
9
10
11
# 获取到返回的存活ip
ip_list = self.get_ac_ip(ip_list)
# 将masscan_ac[0]置1,表示masscan正在使用
self.masscan_ac[0] = 1
# 利用masscan进行全端口扫描
AC_PORT_LIST = self.masscan(ip_list)
if not AC_PORT_LIST: continue
# 将masscan_ac[0]置0
self.masscan_ac[0] = 0
for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str) # 加入队列
self.scan_start() # 开始扫描

跟进self.masscan()函数

1
2
3
4
5
6
7
8
9
10
def masscan(self, ip):
try:
if len(ip) == 0: return
sys.path.append(sys.path[0] + "/plugin")
m_scan = __import__("masscan")
result = m_scan.run(ip, self.masscan_path, self.masscan_rate)
return result
except Exception, e:
print e
print 'No masscan plugin detected'

跟进m_scan.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import os
def run(ip_list,path,rate):
try:
ip_file = open('target.log','w')
# 将存活的ip列表写到target.log中
ip_file.write("\n".join(ip_list))
ip_file.close()
# 进行过滤一些危险字符
#(issue中也有提到,并不能完全保证后台的安全,主要还是保证对密钥的管理
path = str(path).translate(None, ';|&`\n')
rate = str(rate).translate(None, ';|&`\n')
if not os.path.exists(path):return
# 用系统命令进行masscan全端口扫描
os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s"%(path,rate))
# 读取扫描结果
result_file = open('tmp.log', 'r')
result_json = result_file.readlines()
result_file.close()
del result_json[0]
del result_json[-1]
open_list = {}
# 对扫描结果进行格式化处理
for res in result_json:
try:
ip = res.split()[3]
port = res.split()[2]
if ip in open_list:
open_list[ip].append(port)
else:
open_list[ip] = [port]
except:pass
os.remove('target.log')
os.remove('tmp.log')
# 返回扫描结果
return open_list
except:
pass

这样,再次回到start.pyrun()

1
2
3
4
5
6
7
8
9
# 用masscan进行全端口扫描
AC_PORT_LIST = self.masscan(ip_list)
if not AC_PORT_LIST: continue
# 将self.masscan_ac[0]置0,表示结束使用
self.masscan_ac[0] = 0
# 将扫描结果存入队列中
for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str)
# 开始扫描
self.scan_start()

scan.py

前期准备

self.scan_start()

1
2
3
4
5
6
7
8
9
def scan_start(self):
for i in range(self.thread): # 开始扫描
t = ThreadNum(self.queue)
t.setDaemon(True)
t.mode = self.mode
t.config_ini = self.config_ini
t.statistics = self.statistics
t.start()
self.queue.join()

跟进ThreadNum

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ThreadNum(threading.Thread):
def __init__(self, queue):
# 赋值扫描队列
threading.Thread.__init__(self)
self.queue = queue

def run(self):
while True:
try:
# 非阻塞模式
task_host = self.queue.get(block=False)
except:
break
try:
if self.mode:
# 开启masscan扫描则使用扫描出的存活端口
port_list = AC_PORT_LIST[task_host]
else:
# 否则扫描特定的端口
port_list = self.config_ini['Port_list'].split('|')[1].split('\n')
_s = scan.scan(task_host, port_list) # 初始化scan
_s.config_ini = self.config_ini # 提供配置信息
_s.statistics = self.statistics # 提供统计信息
_s.run() # 启动
except Exception, e:
print e
finally:
self.queue.task_done()

跟到scan类的run()方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def run(self):
self.timeout = int(self.config_ini['Timeout']) # 获取timeout
for _port in self.port_list:
self.server = ''
self.banner = ''
self.port = int(_port)
self.scan_port() # 端口扫描
if not self.banner:continue #无banner则跳过(`NULL`表示暂未检测出,不会continue
self.server_discern() # 服务识别
if self.server == '':
web_info = self.try_web() # 尝试web访问
if web_info:
# log记录
log.write('web', self.ip, self.port, web_info)
time_ = datetime.datetime.now()
# 将扫描结果存入mongodb
mongo.NA_INFO.update({'ip': self.ip, 'port': self.port},
{"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info,
'time': time_}})

端口扫描

先是进行了self.scan_port()端口扫描

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def scan_port(self):
try:
# 进行socket连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect((self.ip, self.port))
time.sleep(0.2)
except Exception, e:
return
try:
# 获取banner信息
self.banner = sock.recv(1024)
sock.close()
# 小于等于2则置为'NULL'
if len(self.banner) <= 2:
self.banner = 'NULL'
except Exception, e:
# 异常情况也置为'NULL'
self.banner = 'NULL'
# 日志记录
log.write('portscan', self.ip, self.port, None)
banner = ''
hostname = self.ip2hostname(self.ip)
time_ = datetime.datetime.now()
date_ = time_.strftime('%Y-%m-%d')
try:
# 进行unicode转换
banner = unicode(self.banner, errors='replace')
if self.banner == 'NULL': banner = ''
# 添加一条info信息
mongo.NA_INFO.insert({"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})
# 统计信息+1
self.statistics[date_]['add'] += 1
except:
if banner:
# 原子操作,删除已存在的记录
history_info = mongo.NA_INFO.find_and_modify(
query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}}, remove=True)
if history_info:
# 新增info记录
mongo.NA_INFO.insert(
{"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})
# 统计信息+1
self.statistics[date_]['update'] += 1
# 删除原先的_id
del history_info["_id"]
history_info['del_time'] = time_
history_info['type'] = 'update'
# 更新type和del_time之后插入一条新历史记录
mongo.NA_HISTORY.insert(history_info)

进行socket连接的时候,例如一些ssh之类的服务,会返回一些banner信息

服务识别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def server_discern(self):
# 先尝试进行利用配置中的`Discern_server`进行快速匹配识别
for mark_info in self.config_ini['Discern_server']:
try:
name, default_port, mode, reg = mark_info
if mode == 'default':
# default表示用特定端口,匹配特定服务
if int(default_port) == self.port:
self.server = name
elif mode == 'banner':
# 利用banner信息进行正则匹配检测
matchObj = re.search(reg, self.banner, re.I | re.M)
if matchObj:
self.server = name
if self.server:break
except:
continue
# 对于未检测出服务并且端口不为80、443、8080的端口进行检测
if not self.server and self.port not in [80,443,8080]:
for mark_info in self.config_ini['Discern_server']: # 发包识别
try:
name, default_port, mode, reg = mark_info
if mode not in ['default','banner']:
# 进行发送特定的socket包获取banner信息,进行再次匹配
dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dis_sock.connect((self.ip, self.port))
mode = mode.decode('string_escape')
reg = reg.decode('string_escape')
dis_sock.send(mode)
time.sleep(0.3)
dis_recv = dis_sock.recv(1024)
dis_sock.close()
matchObj = re.search(reg, dis_recv, re.I | re.M)
if matchObj:
self.server = name
break
except:
pass
if self.server:
# 对于检测到的服务,进行log和info的记录
log.write("server", self.ip, self.port, self.server)
mongo.NA_INFO.update({"ip": self.ip, "port": self.port}, {"$set": {"server": self.server}})

config_ini['Discern_server']中的值

config_ini['Discern_server']中的特定socket数据包

web访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def try_web(self):
title_str, html = '', ''
try:# 进行http/https请求,获取响应报文
if self.port == 443:
# 对于443端口的使用https协议
info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout)
else:
info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout)
html = info.read()
header = info.headers
except urllib2.HTTPError, e:
html = e.read()
header = e.headers
except:
return
if not header: return
# 对于gzip格式的响应,进行解压gzip
if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']:
html_data = StringIO.StringIO(html)
gz = gzip.GzipFile(fileobj=html_data)
html = gz.read()
try:
# 格式转码
html_code = self.get_code(header, html).strip()
if html_code and len(html_code) < 12:
html = html.decode(html_code).encode('utf-8')
except: pass
try:
# 获取titile信息
title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M)
if title: title_str = title.group(1)
except: pass
try:
# 将响应的http报文设置成banner信息
web_banner = str(header) + "\r\n\r\n" + html
self.banner = web_banner
# 添加记录
history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port})
if 'server' not in history_info:
tag = self.get_tag()
web_info = {'title': title_str, 'tag': tag}
return web_info
else:
if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner) / 60:
del history_info['_id']
history_info['del_time'] = datetime.datetime.now()
mongo.NA_HISTORY.insert(history_info)
tag = self.get_tag()
web_info = {'title': title_str, 'tag': tag}
date_ = datetime.datetime.now().strftime('%Y-%m-%d')
self.statistics[date_]['update'] += 1
log.write('info', None, 0, '%s:%s update web info'%(self.ip, self.port))
return web_info
except:
return

get_tag()

1
2
3
4
5
6
7
8
9
def get_tag(self):
try:
url = self.ip + ':' + str(self.port)
# 对web服务进行cms、组件容器、动态语言的识别
tag = map(self.discern, ['Discern_cms', 'Discern_con', 'Discern_lang'], [url, url, url])
# 过滤掉未识别出的服务
return filter(None, tag)
except Exception, e:
return

discern()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def discern(self, dis_type, domain):
file_tmp = {}
if int(domain.split(":")[1]) == 443: # http/https处理
protocol = "https://"
else:
protocol = "http://"
try:
# http请求
req = urllib2.urlopen(protocol + domain, timeout=self.timeout)
header = req.headers
html = req.read()
except urllib2.HTTPError, e:
html = e.read()
header = e.headers
except Exception, e:
return

# 对于'Discern_cms', 'Discern_con', 'Discern_lang'在数据库中都有自己的识别判断方式
for mark_info in self.config_ini[dis_type]:
if mark_info[1] == 'header':
try:
if not header: return
# 通过header方式则对对应的http头中的值进行匹配
# 如存在PHPSSIONID之类的值判定为php
if re.search(mark_info[3], header[mark_info[2]], re.I):
return mark_info[0]
except Exception, e:
continue
elif mark_info[1] == 'file':
if mark_info[2] == 'index':
try:
if not html: return
# 对于file index方式利用文件后缀,如1.php这样判断为php语言
if re.search(mark_info[3], html, re.I):
return mark_info[0]
except Exception, e:
continue
else:
# 防止重复检测
if mark_info[2] in file_tmp:
re_html = file_tmp[mark_info[2]]
else:
# 访问指定的robots.txt之类的文件
try:
re_html = urllib2.urlopen(protocol + domain + "/" + mark_info[2],
timeout=self.timeout).read()
except urllib2.HTTPError, e:
re_html = e.read()
except Exception, e:
return
file_tmp[mark_info[2]] = re_html
try:
# 检测指定文件中是否存在特定关键字
# 如robots.txt中存在'php168'则为php168cms
if re.search(mark_info[3], re_html, re.I):
return mark_info[0]
except Exception, e:
print mark_info[3]

config_ini[Discern_lang]中的值

config_ini[Discern_cms]中的值


最后回到run()

1
2
3
4
5
6
7
8
web_info = self.try_web()  # 尝试web访问
if web_info:
# 检测完web特征之后,就是进行简单的log记录,和更新数据库中info的值
log.write('web', self.ip, self.port, web_info)
time_ = datetime.datetime.now()
mongo.NA_INFO.update({'ip': self.ip, 'port': self.port},
{"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info,
'time': time_}})

到这里,scan的扫描也就结束了,回到start类的run()中,剩下的就是不使用masscan的扫描

1
2
3
4
5
if self.mode == 0: # 不使用masscan扫描
# 如果设置了icmp检测,会对ip列表进行存活检测,只扫描存活ip
if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list)
for ip_str in all_ip_list: self.queue.put(ip_str) # 加入队列
self.scan_start() # TCP探测模式开始扫描

这里的扫描过程中将ip列表改成了all_ip_list,其余的扫描过程也是通过scan_start()来调用scan类进行扫描。

到这里,整个NAScan资产扫描过程也就完成了,每次扫描完会sleep60秒,然后再次循环这个过程。

vulscan

用于对扫出的资产进行漏洞扫描,具体的扫描过程依赖于vuldb中的插件形式进行扫描,做到可插拔的模式

json格式的插件

转换成json形式后就是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"name" : "Axis2信息泄露",
"info" : "HappyAxis.jsp 页面存在系统敏感信息。",
"level" : "低危",
"type" : "信息泄露",
"author" : "wolf@YSRC",
"url": "",
"keyword" : "tag:axis2",
"source" : 1,
"plugin" : {
"url" : "/axis2/axis2-web/HappyAxis.jsp",
"tag" : "敏感信息泄露",
"analyzing" : "keyword",
"analyzingdata" : "Axis2 Happiness Page",
"data" : "",
"method" : "GET"
}
}

python脚本格式的插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# coding:utf-8

import ftplib

def get_plugin_info(): # 插件描述信息
plugin_info = {
"name": "FTP弱口令",
"info": "导致敏感信息泄露,严重情况可导致服务器被入侵控制。",
"level": "高危",
"type": "弱口令",
"author": "wolf@YSRC",
"url": "",
"keyword": "server:ftp", # 推荐搜索关键字
}
return plugin_info

def check(ip, port, timeout): # 漏洞检测代码
user_list = ['ftp', 'www', 'admin', 'root', 'db', 'wwwroot', 'data', 'web']
for user in user_list:
for pass_ in PASSWORD_DIC: # 密码字典无需定义,程序会自动为其赋值。
pass_ = str(pass_.replace('{user}', user))
try:
ftp = ftplib.FTP()
ftp.timeout = timeout
ftp.connect(ip, port)
ftp.login(user, pass_)
if pass_ == '': pass_ = 'null'
if user == 'ftp' and pass_ == 'ftp': return u"可匿名登录"
return u"存在弱口令,账号:%s,密码:%s" % (user, pass_) # 成功返回结果,内容显示在扫描结果页面。
except:
pass

扫描过程较资产扫描偏简单些,一个280行左右的单文件

一开始定义了一些全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 添加系统路径
sys.path.append(sys.path[0] + '/vuldb')
sys.path.append(sys.path[0] + "/../")
# 获取mongodb账号配置
from Config import ProductionConfig
# 进行mongodb认证连接
db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT)
na_db = getattr(db_conn, ProductionConfig.DBNAME)
na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD)
# 做了几个集合的简化操作
na_task = na_db.Task
na_result = na_db.Result
na_plugin = na_db.Plugin
na_config = na_db.Config
na_heart = na_db.Heartbeat
# 线程锁
lock = thread.allocate()
# 一些全局变量
PASSWORD_DIC = []
THREAD_COUNT = 50
TIMEOUT = 10
PLUGIN_DB = {}
TASK_DATE_DIC = {}
WHITE_LIST = []

然后开始运行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if __name__ == '__main__':
init() # 进行init初始化操作
PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() # 获取配置
thread.start_new_thread(monitor, ()) # 启动监控线程
while True:
task_id, task_plan, task_target, task_plugin = queue_get() # 任务信息获取
if task_id == '':
time.sleep(10)
continue
if PLUGIN_DB:
del sys.modules[PLUGIN_DB.keys()[0]] # 清理插件缓存
PLUGIN_DB.clear()
for task_netloc in task_target:
while True:
if int(thread._count()) < THREAD_COUNT:
if task_netloc[0] in WHITE_LIST: break
thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin))
break
else:
time.sleep(2)
if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}})

准备工作

init

用于获取插件的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def init():
# 若数据库中存在插件信息,则直接返回,否则重新获取
if na_plugin.find().count() >= 1: return
script_plugin = []
json_plugin = []
# 获取vuldb中的插件
file_list = os.listdir(sys.path[0] + '/vuldb')
time_ = datetime.datetime.now()
for filename in file_list:
try:
# 插件分为json和py两种格式
if filename.split('.')[1] == 'py':
script_plugin.append(filename.split('.')[0])
if filename.split('.')[1] == 'json':
json_plugin.append(filename)
except:
pass
for plugin_name in script_plugin:
try:
# py格式的插件直接导入,然后读取对于变量,插入到mongodb中
res_tmp = __import__(plugin_name)
plugin_info = res_tmp.get_plugin_info()
plugin_info['add_time'] = time_
plugin_info['filename'] = plugin_name
plugin_info['count'] = 0
na_plugin.insert(plugin_info)
except:
pass
for plugin_name in json_plugin:
try:
# json格式的插件,用json解析后读取对应变量,插入到mongodb中
json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read()
plugin_info = json.loads(json_text)
plugin_info['add_time'] = time_
plugin_info['filename'] = plugin_name
plugin_info['count'] = 0
del plugin_info['plugin']
na_plugin.insert(plugin_info)
except:
pass

get_config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_config():
try:
config_info = na_config.find_one({"type": "vulscan"})
pass_row = config_info['config']['Password_dic']
thread_row = config_info['config']['Thread']
timeout_row = config_info['config']['Timeout']
white_row = config_info['config']['White_list']
password_dic = pass_row['value'].split('\n')
thread_count = int(thread_row['value'])
timeout = int(timeout_row['value'])
white_list = white_row['value'].split('\n')
return password_dic, thread_count, timeout, white_list
except Exception, e:
print e

和之前nascan中的读取配置类似,只是这回读的是type为vulscan的配置

读取弱口令、线程数、timeout、白名单之类的配置参数,然后返回

monitor

新起了个monitor监测线程,监测插件的使用情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def monitor():
# 引入全局变量
global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST
while True:
# 获取正在执行的任务
queue_count = na_task.find({"status": 0, "plan": 0}).count()
if queue_count:
# 如果有正在执行的任务,则置为1
load = 1
else:
# 否则根据当前线程数,来判断插件是否在被使用
ac_count = thread._count()
load = float(ac_count - 4) / THREAD_COUNT
if load > 1: load = 1
if load < 0: load = 0
# 更新mongodb中的heatbeat集合,有插件正在扫描
na_heart.update({"name": "load"}, {"$set": {"value": load, "up_time": datetime.datetime.now()}})
PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
# 然后根据load值进行不同时间的休眠
if load > 0:
time.sleep(8)
else:
time.sleep(60)

然后进入到while True的循环

通过queue_get()进行任务参数的获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def queue_get():
global TASK_DATE_DIC
# 获取未加载的task,更新为启动状态
task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={"$set": {"status": 1}}, sort={'time': 1})
if task_req:
# 如果存在,在TASK_DATE_DIC记录task,然后返回任务信息
TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()
return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
else:
# 获取 plan != 0 的task列表
task_req_row = na_task.find({"plan": {"$ne": 0}})
if task_req_row:
for task_req in task_req_row:
# 判断是否需要再次启动任务
if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']):
if task_req['isupdate'] == 1:
# 任务更新后,需要重新从info集合中获取ip和port
# 更新task集合的target
task_req['target'] = update_target(json.loads(task_req['query']))
na_task.update({"_id": task_req['_id']}, {"$set": {"target": task_req['target']}})
# 更新task集合中的status自增1
na_task.update({"_id": task_req['_id']}, {"$inc": {"status": 1}})
# 在TASK_DATE_DIC记录task
TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()
# 返回task信息
return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
return '', '', '', ''

回到__main__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 获取任务信息
task_id, task_plan, task_target, task_plugin = queue_get()
if task_id == '':
# 没有获取到task配置则sleep10秒后继续获取
time.sleep(10)
continue
if PLUGIN_DB:
# 当有插件缓存时清理插件缓存
# 后面扫描时会导入插件模块,删除之前导入的模块
del sys.modules[PLUGIN_DB.keys()[0]]
PLUGIN_DB.clear()
for task_netloc in task_target:
while True:
# 控制线程数
if int(thread._count()) < THREAD_COUNT:
# 剔除白名单ip
if task_netloc[0] in WHITE_LIST: break
# 启动vulscan扫描线程
thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin))
break
else:
time.sleep(2)
# task_plan == 0 为一次性任务
# 更新 status = 2
if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}})

vulscan

__init__

1
2
3
4
5
6
def __init__(self, task_id, task_netloc, task_plugin):
self.task_id = task_id
self.task_netloc = task_netloc
self.task_plugin = task_plugin
self.result_info = ''
self.start()

设置好类变量,然后进入start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def start(self):
self.get_plugin_info()
if '.json' in self.plugin_info['filename']: # json检测模式
try:
self.load_json_plugin() # 读取漏洞标示
self.set_request() # 标示符转换为请求
self.poc_check() # 检测
except Exception, e:
return
else: # py脚本检测模式
plugin_filename = self.plugin_info['filename']
self.log(str(self.task_netloc) + "call " + self.task_plugin)
if task_plugin not in PLUGIN_DB:
plugin_res = __import__(plugin_filename)
setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC) # 给插件声明密码字典
PLUGIN_DB[plugin_filename] = plugin_res
try:
self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]),TIMEOUT)
except:
return
self.save_request() # 保存结果

json格式检测

load_json_plugin()加载配置脚本

1
2
3
def get_plugin_info(self):
info = na_plugin.find_one({"name": self.task_plugin})
self.plugin_info = info

然后转换为http请求,返回请求句柄

1
2
3
4
5
6
7
8
9
10
def set_request(self):
# 构建url
url = 'http://' + self.task_netloc[0] + ":" + str(self.task_netloc[1]) + self.plugin_info['plugin']['url']
if self.plugin_info['plugin']['method'] == 'GET':
# 进行GET请求
request = urllib2.Request(url)
else:
# 否则进行post请求
request = urllib2.Request(url, self.plugin_info['plugin']['data'])
self.poc_request = request

然后验证poc是否有效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def poc_check(self):
try:
# 进行http请求,获取header和body信息
res = urllib2.urlopen(self.poc_request, timeout=30)
res_html = res.read(204800)
header = res.headers
# res_code = res.code
except urllib2.HTTPError, e:
# res_code = e.code
header = e.headers
res_html = e.read(204800)
except Exception, e:
return
try:
# 获取编码,然后转码
html_code = self.get_code(header, res_html).strip()
if html_code and len(html_code) < 12:
res_html = res_html.decode(html_code).encode('utf-8')
except:
pass
an_type = self.plugin_info['plugin']['analyzing']
vul_tag = self.plugin_info['plugin']['tag']
analyzingdata = self.plugin_info['plugin']['analyzingdata']
if an_type == 'keyword':
# 如果是关键词检测,判断是正则还是MD5的检测,然后进行比对
if analyzingdata.encode("utf-8") in res_html: self.result_info = vul_tag
elif an_type == 'regex':
if re.search(analyzingdata, res_html, re.I): self.result_info = vul_tag
elif an_type == 'md5':
md5 = hashlib.md5()
md5.update(res_html)
# 比对成功,则返回插件tag
if md5.hexdigest() == analyzingdata: self.result_info = vul_tag

py脚本检测

1
2
3
4
5
6
7
8
9
10
11
12
13
# 获取插件文件名
plugin_filename = self.plugin_info['filename']
self.log(str(self.task_netloc) + "call " + self.task_plugin)
if task_plugin not in PLUGIN_DB:
# 不在PLUGIN_DB中则导入
plugin_res = __import__(plugin_filename)
setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC) # 给插件声明密码字典
PLUGIN_DB[plugin_filename] = plugin_res # 添加到PLUGIN_DB中
try:
# 启用py脚本的check方法,并设置timeout
self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]),TIMEOUT)
except:
return

保存请求结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def save_request(self):
# 判断是否扫描出结果了
if self.result_info:
try:
time_ = datetime.datetime.now()
self.log(str(self.task_netloc) + " " + self.result_info)
# 没有这条扫描记录则插件扫出的记录+1
v_count = na_result.find(
{"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count()
if not v_count: na_plugin.update({"name": self.task_plugin}, {"$inc": {'count': 1}})
vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'],
"vul_type": self.plugin_info['type']}
w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1],
"vul_info": vulinfo, "info": self.result_info, "time": time_,
"task_date": TASK_DATE_DIC[str(self.task_id)]}
# 添加扫描结果记录
na_result.insert(w_vul)
except Exception, e:
pass

到此也就完成了vulscan的扫描过程。

最后

巡风中对于扫描的分工,多线程的处理都有很多值得学习和借鉴的地方。而且几乎都有增加一些心跳线程,用于监测。

分析中难免有些不足或者错误,欢迎大佬们指出!

Reference

https://landgrey.me/xunfeng-nascan-analysis/

CATALOG
  1. 1. 前言
  2. 2. 整体架构逻辑
    1. 2.1. 文件结构
    2. 2.2. Run.sh
      1. 2.2.1. Run.py
      2. 2.2.2. Aider.py
      3. 2.2.3. NAScan.py
      4. 2.2.4. VulScan.py
  3. 3. nascan
    1. 3.1. 准备工作
    2. 3.2. 两个监测线程
      1. 3.2.1. monitor
      2. 3.2.2. cruise
    3. 3.3. start.py
      1. 3.3.1. 探测存活ip
      2. 3.3.2. masscan扫描全端口
      3. 3.3.3. scan.py
        1. 3.3.3.1. 前期准备
        2. 3.3.3.2. 端口扫描
        3. 3.3.3.3. 服务识别
        4. 3.3.3.4. web访问
  4. 4. vulscan
    1. 4.1. 准备工作
      1. 4.1.1. init
      2. 4.1.2. get_config
      3. 4.1.3. monitor
    2. 4.2. vulscan
      1. 4.2.1. json格式检测
      2. 4.2.2. py脚本检测
      3. 4.2.3. 保存请求结果
  5. 5. 最后
  6. 6. Reference