版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_39591494/article/details/82898384
使用Python对Access.log日志进行分析
**** 2018-9-29
贴代码:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import os
import re
import pickle
import sys
from color_me import ColorMe
__author__ = "YanZanG"
File_Path = r"F:\python_test1\Yankerp_test\result_test"
Log_Path = os.path.join(File_Path,"access.log")
IP_find = re.compile(r"((1\d{2}|25[0-5]|2[0-4]\d|[1-9]?\d)\.){3}(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)")
Url_find = re.compile(r"[a-zA-z]+://[^\s]*")
class Access(object):
"""
程序功能类
1:提供分析IP地址
2:提供分析用户请求Url
3:提供访问web页面的状态码
4:将数据写入self.dict 同时pickle保存至目录中.
"""
def __init__(self, log_path, File_Path):
self.IP_database = {}
self.Url_database = {}
self.Web_status = {}
self.log_path = log_path
self.File_Path = File_Path
def ip_address(self):
"""分析日志IP地址数量"""
with open(self.File_Path) as f:
Log_data = f.readlines()
try:
for log_ips in Log_data:
ip_search = IP_find.search(log_ips)
ip_result = ip_search.group()
if ip_result:
self.IP_database[ip_result] = self.IP_database.get(ip_result, 0) + 1
self.save_ip()
else:
print("error!!!")
except Exception as e:
print(e)
def request_url(self):
"""分析日志请求url地址数量"""
with open(self.File_Path) as f:
data_result = f.readlines()
try:
for line in data_result:
web_url = Url_find.search(line)
if web_url:
self.Url_database[web_url.group()] = self.Url_database.get(web_url.group(), 0) + 1
self.save_url()
except Entrance as e:
print(e)
def web_status(self):
"""分析日志请求状态码信息数量"""
with open(self.File_Path) as f:
web_status = f.readlines()
try:
for line in web_status:
web = line.split()[8]
if web:
self.Web_status[web] = self.Web_status.get(web, 0) + 1
except Exception as e:
print(e)
for k, v in self.Web_status.items():
print(f"状态码:{k} 访问次数:{v}次")
def save_url(self):
"""保存日志url信息"""
with open(f"{self.log_path}/web_url_log.pkl", "wb") as f:
pickle.dump(self.Url_database, f)
def save_ip(self):
"""保存日志IP地址访问信息"""
with open(f"{self.log_path}/Access_log.pkl", "wb") as f:
pickle.dump(self.IP_database, f)
def read_url(self):
"""读取日志url信息"""
with open(f"{self.log_path}/web_url_log.pkl", "rb") as f:
url_data = pickle.load(f)
self.Url_database.update(url_data)
for k, v in self.Url_database.items():
print(f"用户访问的网址为:{k} , 共访问次数为:{v}次")
read_url_prompt = ColorMe(f"分析日志Url数据已保存在:{Log_Path}\web_url_log.pkl 文件中").green()
print(read_url_prompt)
def read_data(self):
"""读取日志IP地址信息"""
with open(f"{self.log_path}/Access_log.pkl", "rb") as f:
web_data = pickle.load(f)
self.IP_database.update(web_data)
for k, v in self.IP_database.items():
print(f"用户IP地址为:'{k}' , 访问网站次数为:{v}次")
read_data_prompt = ColorMe(f"分析日志IP地址数据已保存在:{Log_Path}\Access_log.pkl 文件中").green()
print(read_data_prompt)
class Entrance(object):
"""
程序入口
1:使用反射调用类方法
2:提供用户输入信息,调用Access类中的方法
"""
def __init__(self, log_path, File_Path):
self.log_path = log_path
self.File_Path = File_Path
def IP(self):
"""获取用户输入的目录及文件名称赋值-->Access类的属性,调用ip_address方法"""
IP = Access(self.log_path, self.File_Path)
IP.ip_address()
IP.read_data()
def web_url(self):
"""获取用户输入的目录及文件名称赋值-->Access类的属性,调用request_url方法"""
URL = Access(self.log_path,self.File_Path)
URL.request_url()
URL.read_url()
def web_status(self):
"""获取用户输入的目录及文件名称赋值-->Access类的属性,调用web_status方法"""
Status = Access(self.log_path, self.File_Path)
Status.web_status()
def exit(self):
"""入口程序退出方法"""
exit_result = ColorMe("退出成功,欢迎您再次使用~").green()
print(exit_result)
sys.exit()
def menu(self):
"""Welcome欢迎信息- 输入菜单提供用户选择"""
print(f"Welcome to {__author__} log analysis".center(60,"-"))
menu = {
"1" : "分析日志IP地址",
"2" : "分析日志访问URL",
"3" : "分析日志状态码",
"Q" : "退出此程序"
}
menu2 = {
"1" : "IP",
"2" : "web_url",
"3" : "web_status",
"Q" : "exit"
}
Count = True
while(Count):
for k, v in menu.items():
print(f"{k}、{v}")
Your_input = input("请您输入您需要分析的日志内容:".strip()).upper()
if hasattr(self, menu2.get(Your_input)):
func = getattr(self, menu2.get(Your_input))
func()
def menu():
i = 0
e = 3
while (i < e):
OS = ColorMe("系统提示:路径例:‘F:\python_test1\eeeee\’, 文件名称例:'access.log'").green()
print(OS)
File_Path = input("请您输入日志所存放的目录路径:".strip())
Log_Path = input("请您输入日志名称:".strip())
Log_data = os.path.join(File_Path, Log_Path)
if os.path.exists(Log_data):
de_Sssful = ColorMe("检测目录文件成功!!!").green()
print(de_Sssful)
A = Entrance(File_Path, Log_data)
A.menu()
else:
i += 1
menu_result = ColorMe(f"请您输入正确的目录,以及文件名称,您还有{e - i}次输入机会").red()
print(menu_result)
if __name__ == '__main__':
menu()
color_me
#!/usr/bin/env Python
#-*- coding:utf-8 -*-
__author__ = 'De8ug'
class ColorMe:
"""
give me color see see...
实际用起来很简单:
ColorMe('somestr').blue()
"""
def __init__(self, some_str):
self.color_str = some_str
def blue(self):
str_list = ["\033[34;1m", self.color_str, "\033[0m"]
return ''.join(str_list) # "\033[34;1m" + self.color_str + "\033[0m"
def green(self):
str_list = ["\033[32;1m", self.color_str, "\033[0m"]
return ''.join(str_list) # "\033[34;1m" + self.color_str + "\033[0m"
def yellow(self):
str_list = ["\033[33;1m", self.color_str, "\033[0m"]
return ''.join(str_list) # "\033[34;1m" + self.color_str + "\033[0m"
def red(self):
str_list = ["\033[31;1m", self.color_str, "\033[0m"]
return ''.join(str_list) # "\033[34;1m" + self.color_str + "\033[0m"
def main():
ColorMe('somestr').blue()
if __name__ == '__main__':
main()