使用朴素贝叶斯算法进行Web安全相关检测

版权声明:请多多关注博主哟~ https://blog.csdn.net/qq_37865996/article/details/87822640

1.检测异常操作

# -*- coding:utf-8 -*-

import sys
import urllib
import urlparse
import re
from hmmlearn import hmm
import numpy as np
from sklearn.externals import joblib
import HTMLParser
import nltk
import csv
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
from nltk.probability import FreqDist
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB

#测试样本数
N=90

def load_user_cmd_new(filename):
    cmd_list=[]
    dist=[]
    #依次读取每行操作命令,每100个命令组成一个操作序列,保存在列表里面
    with open(filename) as f:
        i=0
        x=[]
        for line in f:
            line=line.strip('\n')#用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。
            x.append(line)       #用于在列表末尾添加新的对象
            dist.append(line)
            i+=1
            if i == 100:
                cmd_list.append(x)
                x=[]
                i=0
#统计文本的出现次数
    fdist = FreqDist(dist).keys()
    return cmd_list,fdist

def load_user_cmd(filename):
    cmd_list=[]
    dist_max=[]
    dist_min=[]
    dist=[]
    #依次读取每行操作命令,每100个命令组成一个操作序列,保存在列表里面。使用词集模型,统计全部操作命令,去重后形成字典。
    with open(filename) as f:
        i=0
        x=[]
        for line in f:
            line=line.strip('\n')
            x.append(line)
            dist.append(line)
            i+=1
            if i == 100:
                cmd_list.append(x)
                x=[]
                i=0
#统计最频繁使用的前50个命令和最不频繁的前50个命令
    fdist = FreqDist(dist).keys()
    dist_max=set(fdist[0:50])
    dist_min = set(fdist[-50:])
    return cmd_list,dist_max,dist_min

def get_user_cmd_feature(user_cmd_list,dist_max,dist_min):
    user_cmd_feature=[]
    for cmd_block in user_cmd_list:
        f1=len(set(cmd_block))
        fdist = FreqDist(cmd_block).keys()
        f2=fdist[0:10]
        f3=fdist[-10:]
        f2 = len(set(f2) & set(dist_max))
        f3=len(set(f3)&set(dist_min))
        x=[f1,f2,f3]
        user_cmd_feature.append(x)
    return user_cmd_feature
#以上方形成的字典作为向量空间,将每个命令序列转换成对应的向量
def get_user_cmd_feature_new(user_cmd_list,dist):
    user_cmd_feature=[]
    for cmd_list in user_cmd_list:
        v=[0]*len(dist)
        for i in range(0,len(dist)):
            if dist[i] in cmd_list:
                v[i]+=1
        user_cmd_feature.append(v)

    return user_cmd_feature

def get_label(filename,index=0):
    x=[]
    with open(filename) as f:
        for line in f:
            line=line.strip('\n')
            x.append( int(line.split()[index]))
    return x

if __name__ == '__main__':
    user_cmd_list,dist=load_user_cmd_new("/Users/zhanglipeng/Data/MasqueradeDat/User3")
    user_cmd_feature=get_user_cmd_feature_new(user_cmd_list,dist)
    labels=get_label("/Users/zhanglipeng/Data/MasqueradeDat/label.txt",2)
    y=[0]*50+labels

    x_train=user_cmd_feature[0:N]
    y_train=y[0:N]

    x_test=user_cmd_feature[N:150]
    y_test=y[N:150]

    neigh = KNeighborsClassifier(n_neighbors=3)
    neigh.fit(x_train, y_train)
    y_predict_knn=neigh.predict(x_test)
    print y_train
    #使用NB训练
    clf = GaussianNB().fit(x_train, y_train)
    y_predict_nb=clf.predict(x_test)


    score=np.mean(y_test==y_predict_knn)*100
    print "KNN %d" % score

    score=np.mean(y_test==y_predict_nb)*100
    print "NB %d" % score

中间出了很多问题,各位可以在下面评论,我会进行解答。

本次运行结果:

(python27) zhanglipengdeMacBook-Pro:WSaL zhanglipeng$ python 7-1.py

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]

KNN 83

NB 83

KNN和NB对本次使用的数据集所进行异常操作检测的准确率都为83%。

2.检测WebShell

import os
from sklearn.feature_extraction.text import CountVectorizer
import sys
import numpy as np
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
def load_file(file_path):
    t=""
    with open(file_path) as f:
        for line in f:
            line=line.strip('\n')
            t+=line
    return t
def load_files(path):
    files_list=[]
    for r, d, files in os.walk(path):

        for file in files:
            if file.endswith('.php'):
                file_path=path+file
                print "Load %s" % file_path
                t=load_file(file_path)
                files_list.append(t)
    return  files_list



if __name__ == '__main__':
    webshell_bigram_vectorizer = CountVectorizer(ngram_range=(2, 2), decode_error="ignore",
                                        token_pattern = r'\b\w+\b',min_df=1)
    webshell_files_list=load_files("/Users/zhanglipeng/Data/PHP-WEBSHELL/xiaoma/")
    x1=webshell_bigram_vectorizer.fit_transform(webshell_files_list).toarray()
    y1=[1]*len(x1)
    vocabulary=webshell_bigram_vectorizer.vocabulary_

    wp_bigram_vectorizer = CountVectorizer(ngram_range=(2, 2), decode_error="ignore",
                                        token_pattern = r'\b\w+\b',min_df=1,vocabulary=vocabulary)
    wp_files_list=load_files("/Users/zhanglipeng/Data/wordpress/")
    x2=wp_bigram_vectorizer.fit_transform(wp_files_list).toarray()
    y2=[0]*len(x2)

    x=np.concatenate((x1,x2))
    y=np.concatenate((y1, y2))

    clf = GaussianNB()

    print  cross_val_score(clf, x, y, n_jobs=-1,cv=10)

'''#针对黑样本集合,以2-gram算法生成全局的词汇表。2-gram基于单词切割,所以设置token的切割方法为r'\b\w+\b'.#CountVectorizer只考虑词汇在文本中出现的频率,ngram_range=(2,2)意为基于2-gram,关于n-gram可以查看https://www.cnblogs.com/wzm-xu/p/4229819.html。decode_error=ignore,意为忽略异常字符的影响。'''

'''#fit_transform是fit和transform的组合,既包括了训练又包含了转换。'''

[0.75       0.6875     0.3125     0.875      0.625      0.375

0.625      0.46666667 0.8        0.73333333]

正确率为62%。

下面尝试对函数调用操作建立特征,使用1-gram生成全局的词汇表,希望能提高正确率。

在1-gram模型下:

P(w1, w2, w3, … , wn)=P(w1)P(w2|w1)P(w3|w1w2)P(w4|w1w2w3)…P(wn|w1w2…wn-1)

≈P(w1)P(w2|w1)P(w3|w2)P(w4|w3)…P(wn|wn-1)

关于特征选择的作用和叙述,https://www.cnblogs.com/nolonely/p/6435083.html

import os
from sklearn.feature_extraction.text import CountVectorizer
import sys
import numpy as np
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB


r_token_pattern=r'\b\w+\b\(|\'\w+\''


def load_file(file_path):
    t=""
    with open(file_path) as f:
        for line in f:
            line=line.strip('\n')
            t+=line
    return t


def load_files(path):
    files_list=[]
    for r, d, files in os.walk(path):
        for file in files:
            if file.endswith('.php'):
                file_path=path+file
                #print "Load %s" % file_path
                t=load_file(file_path)
                files_list.append(t)
    return  files_list



if __name__ == '__main__':

    #bigram_vectorizer = CountVectorizer(ngram_range=(2, 2),token_pattern = r'\b\w+\b', min_df = 1)
    webshell_bigram_vectorizer = CountVectorizer(ngram_range=(1, 1), decode_error="ignore",
                                        token_pattern = r_token_pattern,min_df=1)
    webshell_files_list=load_files("/Users/zhanglipeng/Data/PHP-WEBSHELL/xiaoma/")
    x1=webshell_bigram_vectorizer.fit_transform(webshell_files_list).toarray()
    y1=[1]*len(x1)
    vocabulary=webshell_bigram_vectorizer.vocabulary_


    wp_bigram_vectorizer = CountVectorizer(ngram_range=(1, 1), decode_error="ignore",
                                        token_pattern = r_token_pattern,min_df=1,vocabulary=vocabulary)
    wp_files_list=load_files("/Users/zhanglipeng/Data/wordpress/")
    x2=wp_bigram_vectorizer.transform(wp_files_list).toarray()
    #print x2
    y2=[0]*len(x2)

    x=np.concatenate((x1,x2))
    y=np.concatenate((y1, y2))

    clf = GaussianNB()
    print vocabulary
    print cross_val_score(clf, x, y, n_jobs=-1,cv=10)

[0.75       0.75       0.9375     0.6875     0.8125     0.6875

0.75       0.86666667 0.86666667 1.        ]

正确率为72%。

正确率太低,和交叉验证法的折数设置也有关系。

3.检测DGA域名

域名生成算法给打击和关闭该类型僵尸网络造成了很大的麻烦,那么如何掌握域名生成算法和输入以便对生成的域名及时进行处置,是一个很必要的学习。

在这个代码中,作者加载了alexa前1000个域名作为白样本,标记为0,加载cryptolocker和post-tovar-goz家族的DGA域名,分别标记为2,3。并且使用2-gram处理DGA域名。

# -*- coding:utf-8 -*-

import sys
import urllib
import urlparse
import re
from hmmlearn import hmm
import numpy as np
from sklearn.externals import joblib
import HTMLParser
import nltk
import csv
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import os
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn.naive_bayes import GaussianNB

#处理域名的最小长度
MIN_LEN=10

#状态个数
N=8
#最大似然概率阈值
T=-50

#模型文件名
FILE_MODEL="9-2.m"

def load_alexa(filename):
    domain_list=[]
    csv_reader = csv.reader(open(filename))
    for row in csv_reader:
        domain=row[1]
        if len(domain)>= MIN_LEN:
            domain_list.append(domain)
    return domain_list

def domain2ver(domain):
    ver=[]
    for i in range(0,len(domain)):
        ver.append([ord(domain[i])])
    return ver

def train_hmm(domain_list):
    X = [[0]]
    X_lens = [1]
    for domain in domain_list:
        ver=domain2ver(domain)
        np_ver = np.array(ver)
        X=np.concatenate([X,np_ver])
        X_lens.append(len(np_ver))

    remodel = hmm.GaussianHMM(n_components=N, covariance_type="full", n_iter=100)
    remodel.fit(X,X_lens)
    joblib.dump(remodel, FILE_MODEL)

    return remodel

def load_dga(filename):
    domain_list=[]
    #xsxqeadsbgvpdke.co.uk,Domain used by Cryptolocker - Flashback DGA for 13 Apr 2017,2017-04-13,
    # http://osint.bambenekconsulting.com/manual/cl.txt
    with open(filename) as f:
        for line in f:
            domain=line.split(",")[0]
            if len(domain)>= MIN_LEN:
                domain_list.append(domain)
    return  domain_list

def test_dga(remodel,filename):
    x=[]
    y=[]
    dga_cryptolocke_list = load_dga(filename)
    for domain in dga_cryptolocke_list:
        domain_ver=domain2ver(domain)
        np_ver = np.array(domain_ver)
        pro = remodel.score(np_ver)
        #print  "SCORE:(%d) DOMAIN:(%s) " % (pro, domain)
        x.append(len(domain))
        y.append(pro)
    return x,y

def test_alexa(remodel,filename):
    x=[]
    y=[]
    alexa_list = load_alexa(filename)
    for domain in alexa_list:
        domain_ver=domain2ver(domain)
        np_ver = np.array(domain_ver)
        pro = remodel.score(np_ver)
        #print  "SCORE:(%d) DOMAIN:(%s) " % (pro, domain)
        x.append(len(domain))
        y.append(pro)
    return x, y

def show_hmm():
    domain_list = load_alexa("/Users/zhanglipeng/Data/top-1000.csv")
    if not os.path.exists(FILE_MODEL):
        remodel=train_hmm(domain_list)
    remodel=joblib.load(FILE_MODEL)
    x_3,y_3=test_dga(remodel, "/Users/zhanglipeng/Data/dga-post-tovar-goz-1000.txt")
    x_2,y_2=test_dga(remodel,"/Users/zhanglipeng/Data/dga-cryptolocke-1000.txt")
    x_1,y_1=test_alexa(remodel, "/Users/zhanglipeng/Data/test-top-1000.csv")
    fig,ax=plt.subplots()
    ax.set_xlabel('Domain Length')
    ax.set_ylabel('HMM Score')
    ax.scatter(x_3,y_3,color='b',label="dga_post-tovar-goz",marker='o')
    ax.scatter(x_2, y_2, color='g', label="dga_cryptolock",marker='v')
    ax.scatter(x_1, y_1, color='r', label="alexa",marker='*')
    ax.legend(loc='best')
    plt.show()


def get_aeiou(domain_list):
    x=[]
    y=[]
    for domain in domain_list:
        x.append(len(domain))
        count=len(re.findall(r'[aeiou]',domain.lower()))
        count=(0.0+count)/len(domain)
        y.append(count)
    return x,y

def show_aeiou():
    x1_domain_list = load_alexa("/Users/zhanglipeng/Data/top-1000.csv")
    x_1,y_1=get_aeiou(x1_domain_list)
    x2_domain_list = load_dga("/Users/zhanglipeng/Data/dga-cryptolocke-1000.txt")
    x_2,y_2=get_aeiou(x2_domain_list)
    x3_domain_list = load_dga("/Users/zhanglipeng/Data/dga-post-tovar-goz-1000.txt")
    x_3,y_3=get_aeiou(x3_domain_list)

    fig,ax=plt.subplots()
    ax.set_xlabel('Domain Length')
    ax.set_ylabel('AEIOU Score')
    ax.scatter(x_3,y_3,color='b',label="dga_post-tovar-goz",marker='o')
    ax.scatter(x_2, y_2, color='g', label="dga_cryptolock",marker='v')
    ax.scatter(x_1, y_1, color='r', label="alexa",marker='*')
    ax.legend(loc='best')
    plt.show()

def get_uniq_char_num(domain_list):
    x=[]
    y=[]
    for domain in domain_list:
        x.append(len(domain))
        count=len(set(domain))
        count=(0.0+count)/len(domain)
        y.append(count)
    return x,y

def show_uniq_char_num():
    x1_domain_list = load_alexa("/Users/zhanglipeng/Data/top-1000.csv")
    x_1,y_1=get_uniq_char_num(x1_domain_list)
    x2_domain_list = load_dga("/Users/zhanglipeng/Data/dga-cryptolocke-1000.txt")
    x_2,y_2=get_uniq_char_num(x2_domain_list)
    x3_domain_list = load_dga("/Users/zhanglipeng/Data/dga-post-tovar-goz-1000.txt")
    x_3,y_3=get_uniq_char_num(x3_domain_list)

    fig,ax=plt.subplots()
    ax.set_xlabel('Domain Length')
    ax.set_ylabel('UNIQ CHAR NUMBER')
    ax.scatter(x_3,y_3,color='b',label="dga_post-tovar-goz",marker='o')
    ax.scatter(x_2, y_2, color='g', label="dga_cryptolock",marker='v')
    ax.scatter(x_1, y_1, color='r', label="alexa",marker='*')
    ax.legend(loc='best')
    plt.show()


def count2string_jarccard_index(a,b):
    x=set(' '+a[0])
    y=set(' '+b[0])
    for i in range(0,len(a)-1):
        x.add(a[i]+a[i+1])
    x.add(a[len(a)-1]+' ')

    for i in range(0,len(b)-1):
        y.add(b[i]+b[i+1])
    y.add(b[len(b)-1]+' ')

    return (0.0+len(x-y))/len(x|y)


def get_jarccard_index(a_list,b_list):
    x=[]
    y=[]
    for a in a_list:
        j=0.0
        for b in b_list:
            j+=count2string_jarccard_index(a,b)
        x.append(len(a))
        y.append(j/len(b_list))

    return x,y


def show_jarccard_index():
    x1_domain_list = load_alexa("/Users/zhanglipeng/Data/top-1000.csv")
    x_1,y_1=get_jarccard_index(x1_domain_list,x1_domain_list)
    x2_domain_list = load_dga("/Users/zhanglipeng/Data/dga-cryptolocke-1000.txt")
    x_2,y_2=get_jarccard_index(x2_domain_list,x1_domain_list)
    x3_domain_list = load_dga("/Users/zhanglipeng/Data/dga-post-tovar-goz-1000.txt")
    x_3,y_3=get_jarccard_index(x3_domain_list,x1_domain_list)

    fig,ax=plt.subplots()
    ax.set_xlabel('Domain Length')
    ax.set_ylabel('JARCCARD INDEX')
    ax.scatter(x_3,y_3,color='b',label="dga_post-tovar-goz",marker='o')
    ax.scatter(x_2, y_2, color='g', label="dga_cryptolock",marker='v')
    ax.scatter(x_1, y_1, color='r', label="alexa",marker='*')
    ax.legend(loc='lower right')
    plt.show()

def nb_dga():
    x1_domain_list = load_alexa("/Users/zhanglipeng/Data/top-1000.csv")
    x2_domain_list = load_dga("/Users/zhanglipeng/Data/dga-cryptolocke-1000.txt")
    x3_domain_list = load_dga("/Users/zhanglipeng/Data/dga-post-tovar-goz-1000.txt")

    x_domain_list=np.concatenate((x1_domain_list, x2_domain_list,x3_domain_list))

    y1=[0]*len(x1_domain_list)
    y2=[1]*len(x2_domain_list)
    y3=[2]*len(x3_domain_list)

    y=np.concatenate((y1, y2,y3))


    cv = CountVectorizer(ngram_range=(2, 2), decode_error="ignore",
                                          token_pattern=r"\w", min_df=1)
    x= cv.fit_transform(x_domain_list).toarray()

    clf = GaussianNB()
    print cross_val_score(clf, x, y, n_jobs=-1, cv=3)

if __name__ == '__main__':
    nb_dga()

[0.94636872 0.93161435 0.93834081]

4.检测DDoS

在KDD99数据集中,我们可以根据DDoS攻击时基于时间和基于主机的网络流量统计特征,挑选自己认为关键的特征作为样本特征,再使用朴素贝叶斯算法:

# -*- coding:utf-8 -*-

import re
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import os
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
import os
from sklearn.naive_bayes import GaussianNB


def load_kdd99(filename):
    x=[]
    with open(filename) as f:
        for line in f:
            line=line.strip('\n')
            line=line.split(',')
            x.append(line)
    return x

def get_apache2andNormal(x):
    v=[]
    w=[]
    y=[]
    for x1 in x:
        if ( x1[41] in ['apache2.','normal.'] ) and ( x1[2] == 'http' ):
            if x1[41] == 'apache2.':
                y.append(1)
            else:
                y.append(0)

            x1 = [x1[0]] + x1[4:8]+x1[22:30]+x1[31:40]
            #x1 = x1[4:8]
            v.append(x1)

    for x1 in v :
        v1=[]
        for x2 in x1:
            v1.append(float(x2))
        w.append(v1)
    return w,y

if __name__ == '__main__':
    v=load_kdd99("/Users/zhanglipeng/Data/kddcup99/corrected")
    x,y=get_apache2andNormal(v)
    clf = GaussianNB()
    print  cross_val_score(clf, x, y, n_jobs=-1, cv=10)



[0.99925094 0.99875156 0.99950062 0.99950062 0.996004   0.9995005

0.997003   0.98975768 0.99975019 0.99925056]

6.识别验证码

MNIST数据集已经将24*24的图片特征为784的一维向量。

# -*- coding:utf-8 -*-

import re
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import os
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
import os
from sklearn.naive_bayes import GaussianNB


import pickle
import gzip


def load_data():
    with gzip.open('/Users/zhanglipeng/Data/MNIST/mnist.pkl.gz') as fp:
        training_data, valid_data, test_data = pickle.load(fp)
    return training_data, valid_data, test_data


if __name__ == '__main__':
    training_data, valid_data, test_data=load_data()
    x1,y1=training_data
    x2,y2=test_data
    clf = GaussianNB()
    clf.fit(x1, y1)
    print cross_val_score(clf, x2, y2, scoring="accuracy")

[0.53684841 0.58385839 0.6043857 ]

猜你喜欢

转载自blog.csdn.net/qq_37865996/article/details/87822640