當前位置:
首頁 > 新聞 > 以溯源為目的蜜罐系統建設

以溯源為目的蜜罐系統建設

0x00、前言&思路

一直以來,蜜罐系統建立對溯源系統來說有雞肋之嫌,基於公網的蜜罐系統最早的產品形式是: wooyun當時做的一套蜜罐系統,主要是模擬底層服務弱口令(ssh、rdp、telnet)、資料庫服務弱口令(mysql、postgresql、redis)、web應用(nginx、apache、tomcat等)、應用漏洞(OpenSSL心臟出血、Java RMI 反序列化代碼執行、Struts2遠程代碼執行等)但是,這種主要是針對具有掃描器功能漏洞滲透自動化程序其作用,獲取攻擊源IP地址,獲取植入惡意樣本,然後沙盒運行樣本做C&C地址分析。但是為了獲取蜜罐樣本的全面性,我們需要投入大量的基礎設施建設成本,同時針對,我們需要DDoS定向溯源的地址還不一定有幫助。

那有沒有更好的方式方法,可以第一時間發現DDoS呢?我們能不能通過一種更簡便更輕資產的辦法獲得目前公網存活的惡意樣本?根據我們的DDoS溯源的以往的經驗上看,很多黑客喜歡通過http共享惡意文件方式,HFS是他們首選,,那我們是不是可以把全網HFS掃描出來,然後把這些惡意程序放到一個養雞場,當說發起DDoS攻擊的時候,我們養雞場中的Agent也接到攻擊命令,我們就知道哪個C&C Server發起功能,攻擊的目標是誰。在這裡我們引入一個概念:養雞場,就是把認為是惡意樣本都放到一起,正常飼養起來。

0x01、概念驗證&PoC

完成以上想法需要業務模塊:

1. 快速識別全國HFS伺服器

(1)現有網路空間搜索引擎系統調研

shodan

a. 免費版只能翻兩頁 20條記錄。

censys

a. 是免費也提供API操作,但是準確性存疑,免費的東西。。。

zoomeye

a. 要求實名註冊,國內的限制,這個。。。

FOFA

a. 專業版1個月3000提供API,在線查詢10000條/次,基礎版在線1000條。窮。。。

(2)掃描技術原理調研

masscan

a. banner識別有限

zmap+grab

a. censys就使用這種方式,結果不準確

nmap

a. 服務指紋識別準確但是速度是短板

Zoomeye

a. 前端:vue

b. 後端:Django

c. 資料庫:MongoDB+Elasticsearch

d. 掃描Worker/調度:Lucifer,

e. 組件指紋識別引擎Wmap/Xmap

某朋友的掃描系統

a. 客戶端:masscan+nmap+nginx+redis+haproxy

b. 伺服器端:Elasticsearch+ celery + django+ spark

(3)我的解決方案

目前系統處在驗證階段先簡單架構,先掃描一下國內IP端。

埠掃描:masscan

服務指紋識別:nmap

資料庫:postgresql

惡意文件驗證:下載HFS伺服器文件通過API發送virustotal校驗。

2. 養雞場部署

(1)蜜罐系統選擇

更少的資源跑更多的蜜罐(杜鵑)

(2)外網監控

IDS監控IP和DNS對外連接情況(Suricata+ES)

(3)DDoS監控

外網發包pps監控告警

0x02、代碼實現

在這之前寫過一篇有關掃描器的文章(Freebuf bt0sea),當時的需求只是監控本公司IP段高危埠連接。沒幾個B段。但是現在監控的是全中國IP地址段,這有點多,直接nmap猴年馬月也掃不完呀。

Problem 1:如果準確的拿到國內IP段

主要第一次做先把範圍縮小一點。

Apanic提供了每日更新的亞太地區IPv4,IPv6,AS號分配的信息表,訪問url是http://ftp.apnic.net/apnic/stats/apnic/delegated-apnic-latest我們過濾其中的ipv4數據即可。

apnic|CN|ipv4|42.156.64.0|16384|20110412|allocated

apnic|CN|ipv4|42.156.128.0|32768|20110316|allocated

apnic|CN|ipv4|42.157.0.0|65536|20110316|allocated

apnic|CN|ipv4|42.158.0.0|65536|20110224|allocated

apnic|CN|ipv4|42.159.0.0|65536|20110224|allocated

apnic|CN|ipv4|42.160.0.0|1048576|20110216|allocated

apnic|CN|ipv4|42.176.0.0|524288|20110222|allocated

apnic|CN|ipv4|42.184.0.0|131072|20110228|allocated

apnic|CN|ipv4|42.186.0.0|65536|20110316|allocated

apnic|CN|ipv4|42.187.0.0|16384|20110412|allocated

def getip(f,ipinfo=None):

if ipinfo is None:

ipinfo = []

for i in (f):

if i.find("apnic") > -1 and i.find("ipv4") > -1 and i.find("summary") == -1:

iplist = i.split("|")

country = iplist[1]

if country == "CN":

ipinfo.append(i)

return(ipinfo)

然後入庫sqlite。

Problem 2:埠掃描

啟動masscan 30個進程

def Scan():

try:

global g_queue

while not g_queue.empty():

item = g_queue.get()

result = "result"+item+".json"

p = subprocess.Popen("/root/masscan/masscan "+item+" -p80 -oJ "+result, shell=True)

p.wait()

if g_queue.qsize() == 0:

print (u"公網IP埠掃描結束")

return "ok"

except Exception,e:

print e

return e

if __name__ == "__main__":

reload(sys)

sys.setdefaultencoding("utf-8")

listThread = []

### 獲取ip列表加入全局變數中

conn = sqlite3.connect("ipwhois.db")

cur = conn.cursor()

print "Opened database successfully"

cursor = cur.execute("SELECT inetnum from ipwhois")

for row in cursor:

tmpstring = re.split(" - ",row[0])

# print tmpstring[0], tmpstring[1],

tmp=tmpstring[0]+"-"+tmpstring[1]

g_queue.put(tmp)

print tmp

conn.close()

for i in xrange(g_threadNum):

thread = ScanThread(Scan)

thread.start()

listThread.append(thread)

for thread in listThread:

thread.join()

print thread

一共大約3億2000萬IP,當然沒計算已經取消分配的IP段。大約掃描了3天時間。需要優化,我發現有的一個進程掃描一個/8的ip段需要一天的時間,2000多萬IP,所以可以切割成多個B段進行掃描。

Problem 3:服務指紋識別掃描

(1)提取IP

def store(result):

fw = open("iplist.txt", "a")

with open(result,"r") as f:

for line in f:

if line.startswith("{ "}:

try:

temp = json.loads(line[:-2])

fw.writelines(temp["ip"]+"n")

print temp["ip"]

except:

continue

def list_dir_names(urPath):

dirNames = os.listdir(urPath)

for ones in dirNames:

ones.decode("gbk")

return dirNames

if __name__ == "__main__":

reload(sys)

sys.setdefaultencoding("utf-8")

filelist=[]

filelist=glob.glob("*.json")

for item in filelist:

if os.path.getsize(item)!=0:

print item

store(item)

else:

continue

提取後大約開80埠IP為4685409。

def Scan():

try:

global g_queue

while not g_queue.empty():

item = g_queue.get()

result = "result"+item.strip()+".xml"

p = subprocess.Popen("/usr/bin/nmap -oX "+result+" -sV -p80 "+item, shell=True)

p.wait()

print "size = ", g_queue.qsize()

if g_queue.qsize() == 0:

print (u"公網IP掃描結束")

return "ok"

except Exception,e:

print e

return e

def ImportiIPlist():

storejson_file=open("iplist.txt","rb")

lines = storejson_file.readlines()

for line in lines:

print line

g_queue.put(line)

g_totalsize = g_queue.qsize()

print g_totalsize

if __name__ == "__main__":

reload(sys)

sys.setdefaultencoding("utf-8")

listThread = []

ImportiIPlist()

for i in xrange(g_threadNum):

thread = ScanThread(Scan)

thread.start()

listThread.append(thread)

for thread in listThread:

thread.join()

print thread

掃描大約5-6天時間。

Problem 4:掃描結果入庫

def Scan():

try:

timeout = 300

tableName = "%s_%s" % ("scanresult", time.strftime("%Y%m%d"))

conn1 = psycopg2.connect(database="postgres", user="postgres", password="Qwe123!@#", host="127.0.0.1",

port="5432")

cur1 = conn1.cursor()

g_totalsize = g_queue.qsize()

while not g_queue.empty():

item = g_queue.get()

print item

ctime = strftime("%Y-%m-%d %H:%M:%S", gmtime())

print ctime

nmap_report = NmapParser.parse_fromfile(item)

for scanned_hosts in nmap_report.hosts:

print scanned_hosts.address

for serv in scanned_hosts.services:

if serv.state == "open":

m = serv.service_dict.get("extrainfo", "")

print m

if m.find(""")!=-1:

pass

else:

sql = "insert into %s (task_id,ctime, address,port,service,product,product_version,product_extrainfo,os) VALUES ("%s","%s","%s","%s","%s","%s","%s","%s","%s")"

sqlCmd = sql%(tableName,g_task_id,ctime,scanned_hosts.address,str(serv.port),serv.service,serv.service_dict.get("product", ""),serv.service_dict.get("version", ""),serv.service_dict.get("extrainfo", ""),"NULL")

print sqlCmd

cur1.execute(sqlCmd)

conn1.commit()

print "size = ", g_queue.qsize()

g_size = g_queue.qsize()

num = "%.2f" %(float(g_size)/float(g_totalsize))

if g_queue.qsize() == 0:

print (u"公網IP掃描結束")

return "ok"

except Exception,e:

print e

pass

def CreateTable():

curC = connC.cursor()

sqlCreate = "create table if not exists %s (

task_id TEXT,

ctime TEXT,

address TEXT,

port TEXT,

service TEXT,

product TEXT ,

product_version TEXT,

product_extrainfo TEXT,

os TEXT

)"

tableName = "%s_%s"%("scanresult", time.strftime("%Y%m%d"))

sqlCmd = sqlCreate%tableName

curC.execute(sqlCmd)

curC.close()

connC.commit()

connC.close()

def SetTask():

filelist = []

filelist = glob.glob("*.xml")

for item in filelist:

if os.path.getsize(item) != 0:

print item

g_queue.put(item)

else:

continue

def main():

listThread = []

SetTask()

CreateTable()

for i in xrange(g_threadNum):

thread = ScanThread(Scan)

thread.start()

listThread.append(thread)

for thread in listThread:

thread.join()

print thread

if __name__ == "__main__":

reload(sys)

sys.setdefaultencoding("utf-8")

main()

SELECT DISTINCT(address),port,product,product_version,product_extrainfo FROM scanresult_20171231 WHERE product LIKE "%HttpFileServer%"

以下是部分結果

Virustotal

威脅情報查詢結果(360):

對外發包監控用開源IDS就好。

0x02、總結

通過以上的概念驗證,我們可以獲得我們自己想到的樣本。只要稍微整理一下驗證的代碼。就可以快速建立起一個養雞場,實時監控DDOS趨勢,為DDoS溯源添磚加瓦。


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 嘶吼RoarTalk 的精彩文章:

如何使用MitmAP創建一個惡意接入點
嘶吼網站3.0新版上線:專業的內容都在這裡
Invoke-PSImage利用分析
新一代攻擊方式或將閃亮登場,聲波攻擊可使硬碟數據瞬間丟失
Tripwire:一款據稱可以發現任何網站漏洞的安全工具

TAG:嘶吼RoarTalk |