import re
import os
import json
import shutil
import time
from pathlib import Path
import zipfile
import nmap
import subprocess
import datetime
import platform
from collections import defaultdict
from urllib.parse import unquote
import requests
def test_func():
print('这里是test_func')
"""
通用部分
"""
def normal_pass_init(self):
pass
"""
文件操作
---------------------------------------------------------------------------------------------------------------------
"""
@staticmethod
def read_file(path, use_json=False):
with open(path) as f:
if use_json:
return json.load(f)
else:
return f.read()
@staticmethod
def write_file(path, content, use_json=False):
with open(path, 'w') as f:
if use_json:
json.dump(content, f)
else:
f.write(content)
@staticmethod
def create_folder(path):
path = Path(path)
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
return f"文件夹已创建: {path}"
else:
return f"文件夹已创建: {path}"
@staticmethod
def copy_folder(src, dst):
"""
复制文件夹到指定位置
:param src: 源文件夹路径
:param dst: 目标文件夹路径
"""
try:
# 如果目标文件夹已存在,先删除
if not os.path.exists(dst):
# 复制文件夹
shutil.copytree(src, dst)
return f"文件夹已成功复制到: {dst}"
else:
return f"文件夹{dst}已经存在,跳过"
except Exception as e:
raise e
@staticmethod
def copy_all_files(src_folder, dst_folder):
"""
复制源文件夹下的所有文件到目标文件夹(不包含子文件夹)
:param src_folder: 源文件夹路径
:param dst_folder: 目标文件夹路径
"""
try:
# 确保目标文件夹存在
if not os.path.exists(dst_folder):
os.makedirs(dst_folder)
# 遍历源文件夹中的所有文件
for filename in os.listdir(src_folder):
src_file = os.path.join(src_folder, filename)
dst_file = os.path.join(dst_folder, filename)
# 如果是文件,则复制
if os.path.isfile(src_file):
if not os.path.exists(dst_file):
shutil.copy2(src_file, dst_file)
except Exception as e:
raise e
@staticmethod
def delete_all_files_in_folder(folder_path):
"""
删除文件夹下的所有文件,但保留文件夹本身
:param folder_path: 文件夹路径
"""
try:
# 遍历文件夹中的所有文件
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
# 如果是文件,则删除
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
# 如果是子文件夹,则递归删除(可选)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
raise e
@staticmethod
def handle_uploaded_zip(zip_file, extract_to):
"""
处理上传的 ZIP 文件并解压到指定目录
参数:
zip_file: Django 的 UploadedFile 对象
extract_to: 解压目标路径
返回:
解压后的文件列表
"""
try:
# 确保目标目录存在
os.makedirs(extract_to, exist_ok=True)
# 使用 zipfile 解压
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
# 获取 ZIP 内所有文件名
file_list = zip_ref.namelist()
# 解压所有文件到目标目录
zip_ref.extractall(extract_to)
# 返回解压后的文件绝对路径列表
return [os.path.join(extract_to, f) for f in file_list]
except zipfile.BadZipFile:
raise ValueError("上传的文件不是有效的 ZIP 文件")
except Exception as e:
raise RuntimeError(f"解压文件时出错: {str(e)}")
"""
前端操作
---------------------------------------------------------------------------------------------------------------------
"""
@staticmethod
def vuetojson(**kwargs):
patten = r'<[^>]*\bv-if="[^"]*".*?>|<[^>]+>'
html_content = kwargs["fileContent"]
all_tags = re.finditer(patten, html_content)
last_end = 0
i = 0
html_dict_list = []
for item in all_tags:
html_dict = {
"name": "",
"name_cn": "",
"include_tag_attribute": [],
"include_tag_visibleword": [],
"file": kwargs["file_path"],
"type": "",
}
i = i + 1
start = item.start()
# 提取可视化文字
if last_end == 0:
last_end = item.end()
else:
middle_content = html_content[last_end:start]
# 替换掉为空的行
middle_content = re.sub("\s", "", middle_content)
if middle_content != "":
html_dict["include_tag_visibleword"] = [{"name": middle_content, "name_cn": middle_content}]
last_end = item.end()
# 判断是否单标签
judge_single_html_tag = r'/>'
is_single_html_tag = re.search(judge_single_html_tag, html_content[item.start():item.end()])
if is_single_html_tag is None:
html_dict["type"] = "normalsgin"
else:
html_dict["type"] = "singlesgin"
# 去掉空格等准备提取属性和标签名
expel_multiply_space_ = r'\s+'
new_html_content = re.sub('/>', ">", html_content[item.start():item.end()])
expel_multiply_space = re.sub(expel_multiply_space_, " ", new_html_content)
expel_newline_ = r'\n'
expel_newline = re.sub(expel_newline_, "", expel_multiply_space)
# 提取标签名称
expel_jian_ = r'<|>(?=(?:[^"]*"[^"]*")*[^"]*$)'
expel_jian = re.sub(expel_jian_, '', expel_newline)
if " " in expel_jian:
split_expel_jian = re.search(r'\s', expel_jian)
html_dict["name"] = expel_jian[:split_expel_jian.start()]
html_dict["name_cn"] = expel_jian[:split_expel_jian.start()]
# 确认标签中的属性
attrs_str = expel_jian[split_expel_jian.end():]
# 匹配并去除当前属性句子中无等号属性
single_attrs = re.finditer(r'[A-Za-z-]+(?=\s)(?![^"]*"(?=\s))|(?<=\s)([A-Za-z-]+)(?![^"]*")', attrs_str)
for single_attr in single_attrs:
html_dict["include_tag_attribute"].append(
{"name": single_attr.group(), "name_cn": single_attr.group(),
"attribute_value": [{"name": single_attr.group(), "name_cn": single_attr.group()}]})
# 剔除特殊属性
expel_single_attr_attrs_str = re.sub('[A-Za-z-]+\s(?![^"]*"(?=\s))|\s([A-Za-z-]+)(?![^"]*")', '',
attrs_str)
if len(expel_single_attr_attrs_str) != 0:
split_attr = re.split(r'(?<=[\"])\s', expel_single_attr_attrs_str)
for attr_item in split_attr:
# key_value_list = re.split(r'=(?=(?:[^"]*"[^"]*")*[^"]*$)', attr_item)
key_value_list = re.split(r'(?<=[A-Za-z])=(?=")', attr_item)
if len(key_value_list) == 2:
html_dict["include_tag_attribute"].append(
{"name": key_value_list[0], "name_cn": key_value_list[0],
"attribute_value": [{"name": key_value_list[1], "name_cn": key_value_list[1]}]})
else:
html_dict["name"] = expel_jian
html_dict["name_cn"] = expel_jian
html_dict_list.append(html_dict)
if kwargs["is_save_to_file"]:
jsonname = kwargs["projectpath"]
jsonanmesearch = re.search('\/', kwargs["projectpath"])
if not jsonanmesearch == None:
jsonname = kwargs["projectpath"][jsonanmesearch.end():]
file_name = f'{jsonname}.json'
jsonpath = os.path.join(kwargs['temp_folder'] + file_name)
with open(jsonpath, 'w') as file_object:
json.dump(html_dict_list, file_object)
return html_dict_list
@staticmethod
def jsontovue(**kwargs):
json_data = kwargs["json_data"]
text = ""
for item in json_data:
# 重构标签
html_tag = "<" + item["name"]
if item["type"] == "normalsgin":
if not "/" in item["name"]:
for value in item["include_tag_attribute"]:
if value["name"] == value["attribute_value"][0]["name"]:
html_tag = html_tag + " " + value["name"]
else:
html_tag = html_tag + " " + value["name"] + '=' + value["attribute_value"][0]["name"]
html_tag = html_tag + ">" + "\n"
else:
html_tag = html_tag + ">" + "\n"
else:
for value in item["include_tag_attribute"]:
if value["name"] == value["attribute_value"][0]["name"]:
html_tag = html_tag + " " + value["name"]
else:
html_tag = html_tag + " " + value["name"] + '=' + value["attribute_value"][0]["name"]
html_tag = html_tag + "/>" + "\n"
if not item["name"] == "":
if not len(item["include_tag_visibleword"]) == 0:
text = text + item["include_tag_visibleword"][0]["name"]
text = text + html_tag
else:
text = text + html_tag
return text
def read_interaction_data_tree(self, item, container=None):
return_data = {}
for child in item:
if child['data_type'] == 'list':
container = []
if len(child["children"]) > 0:
res = self.read_interaction_data_tree(child["children"], container)
add_list = []
add_list.append(res)
return_data.update({child["key"]: add_list})
elif child["value_type"] == 'variable':
return_data.update({child['key']: child['value']})
elif child['data_type'] == 'dict':
container = {}
if len(child["children"]) > 0:
res = self.read_interaction_data_tree(child["children"], container)
return_data.update({child['key']: res})
elif child["value_type"] == 'variable':
return_data.update({child['key']: child['value']})
else:
return_data.update({child['key']: child['value']})
return return_data
"""
API部分
---------------------------------------------------------------------------------------------------------------------
"""
def group_tree_data(self, *args, **kwargs):
deepest = []
# 分组,确定序号
group = {}
for syntax in kwargs["code_list"]:
# 处理字符串
# 提取代码
pattern = r'(\t{4}|\s{4})'
matches = re.findall(pattern, syntax)
# 确定深度
indentation = len(matches) + 1
# 确定总深度进行分组
deepest.append(indentation)
# 去除重复的层级
deepest = set(deepest)
# 确认分组数量
for deep in deepest:
group.update({f"group{deep}": []})
# 输出详细分组信息
for deep in deepest:
serial_number = 0
for i, syntax in enumerate(kwargs["code_list"]):
pattern = r'(\t{4}|\s{4})'
matches = re.findall(pattern, syntax)
# 确定深度
indentation = len(matches) + 1
syntax = re.sub(pattern, "", syntax)
lines_data = {
"syntax_statement": syntax, # 代码
"syntax_indentation": indentation * 4, # python代码缩进
}
if not syntax:
continue # Skip empty lines
# 生成数据后进行分组
if indentation == deep:
node_info = {
"lines_data": lines_data, # 内容
"line_number": i, # 行号
"indentation": indentation, # 深度
"serial_number": serial_number # 序号
}
# 上层问题
group[f"group{indentation}"].append(node_info)
serial_number = serial_number + 1
return group
def api_code_transform_to_tree(self, *args, **kwargs):
# 进行代码树的初始化
root = kwargs["PythonCodeTreeNode"]({
"syntax_statement": "Root", # 代码
"syntax_indentation": 0, # python代码缩进
}, indentation=0, serial_number=0)
parent = {}
for group_key, group_value in kwargs["group_data"].items():
for node in group_value:
min_difference = float('inf')
if f"group{node['indentation'] - 1}" in kwargs["group_data"]:
for item in kwargs["group_data"][f"group{node['indentation'] - 1}"]:
difference = node["line_number"] - item['line_number']
if 1 <= difference < min_difference:
min_difference = difference
parent = {
"depth": item["indentation"],
"serial_number": item["serial_number"],
}
node.update({"test": parent})
# 至此完整树数据整理完成,进入结构体
for item in kwargs["group_data"].values():
for ss in item:
if ss["indentation"] == 1:
node = kwargs["PythonCodeTreeNode"](ss["lines_data"], parent=root, indentation=ss["indentation"],
serial_number=ss["serial_number"])
else:
parent = kwargs["PythonCodeTreeController"].find_data(root, ss["test"]["depth"],
ss["test"]["serial_number"])
node = kwargs["PythonCodeTreeNode"](ss["lines_data"], parent=parent, indentation=ss["indentation"],
serial_number=ss["serial_number"])
kwargs["PythonCodeTreeController"].add_data(node)
return kwargs["PythonCodeTreeController"].to_array(root)
def api_transform_to_json(self, *args, **kwargs):
code_string = kwargs["code_string"]
code_string = re.sub(r'\s+#.*$', "", code_string, flags=re.M)
code_string = re.sub(r'\'\'\'(.*?)\'\'\'|"""(.*?)"""', "", code_string, flags=re.M and re.S)
code_string = re.sub(r'\n\s*\n', '\n', code_string)
filter_str = r',\n\s+|\{\n\s+|\",\n\s+|\n\s+\"|\n\s+\}|\[\n\s+|\n\s+\]|\}\n\}\)|\(\n\s+[A-z0-9]|[A-z0-9]\n\}|({)'
filter_code_list = re.finditer(filter_str, code_string)
offset = 0
for ss in filter_code_list:
start_index = ss.start() - offset
end_index = ss.end() - offset
match_pattern = r'\n|\s+'
new_code_string = code_string[:start_index] + re.sub(match_pattern, "",
code_string[start_index:end_index]) + code_string[
end_index:]
offset += len(code_string) - len(new_code_string)
code_string = new_code_string
# try:
# exec(code_string)
# except IndentationError:
# return "缩进错误"
# 形成配置文件
code_setting = code_string.split("\n")
progress_data = self.group_tree_data(code_list=code_setting)
syntax_config = self.api_code_transform_to_tree(group_data=progress_data,
PythonCodeTreeNode=kwargs['PythonCodeTreeNode'],
PythonCodeTreeController=kwargs['PythonCodeTreeController'])
return syntax_config
"""
python树操作
---------------------------------------------------------------------------------------------------------------------
"""
def py_tree_node_init(self, data, parent=None, last_node=None, indentation=0, serial_number=0, extent_data=None):
if extent_data is None:
extent_data = {}
self.data = data
self.parent = parent
self.children = []
self.indentation = indentation
self.extent_data = extent_data
self.serial_number = serial_number
self.last_node = last_node
# 增
@classmethod
def add_data(cls, node):
node.parent.children.append(node)
# 删
@classmethod
def del_data(cls):
pass
# 改
@classmethod
def edit_data(cls):
pass
# 查
@classmethod
def find_data(cls, root_node, i, s):
if root_node.indentation == i and root_node.serial_number == s:
return root_node
for child in root_node.children:
result = cls.find_data(child, i, s)
if result:
return result
return None
@classmethod
def find_data_by_path(cls, root_node, path, depth=0):
if root_node is None:
return None
components = path.split('/')
current_node = root_node
for component in components:
if not component: # 跳过空的组件(例如,路径开头为 '/')
continue
found = False
for child in current_node.children:
if child.data["name"] == component:
current_node = child
found = True
break
if not found:
return None # 如果未找到路径中的某个组件,返回None
return current_node
# 输出
@classmethod
def to_array(cls, node):
result = {}
if not isinstance(node, str):
result = node.data.copy()
result["indentation"] = node.indentation
result["serial_number"] = node.serial_number
result["children"] = [cls.to_array(child) for child in node.children]
return result
# 输出
@classmethod
def to_array2(cls, node):
result = {}
if not isinstance(node, str):
result = node.data.copy()
result["indentation"] = node.indentation
result["serial_number"] = node.serial_number
return result
@classmethod
def to_string(cls, code_json):
code_string = ""
# 输出当前节点的值
if code_json['syntax_statement'] != 'Root':
code_string += " " * (code_json["syntax_indentation"] - 4) + code_json['syntax_statement'] + "\n"
# 递归遍历子节点
for child in code_json['children']:
code_string += cls.to_string(child)
return code_string
"""
信息安全模块
"""
@staticmethod
def enhanced_service_scan(target, ports):
nm = nmap.PortScanner()
return_dict = {}
try:
# 使用增强版扫描参数
scan_args = '''
-sV
--script=mysql-info,ssh-hostkey,http-headers
--script-args=ssh-hostkey=all
'''
nm.scan(hosts=target, ports=ports, arguments=scan_args)
for host in nm.all_hosts():
return_dict.update({host: host})
for proto in nm[host].all_protocols():
return_dict.update({"Protocol": proto})
return_dict.update(
{"columns": [{"label": "端口", "prop": "port"}, {"label": "状态", "prop": "state"},
{"label": "服务", "prop": "Service"}, {"label": "产品", "prop": "Product"},
{"label": "版本", "prop": "Version"}]})
data_list = []
for port, data in nm[host][proto].items():
# 处理空版本信息
product = data.get('product', '').strip() or 'N/A'
version = data.get('version', '').strip() or 'N/A'
data_list.append({
"port": port,
"state": data['state'],
"Service": data['name'],
"Product": product,
"Version": version
})
return_dict.update({"result": data_list})
except nmap.nmap.PortScannerError as e:
raise e
except Exception as e:
raise e
return return_dict
@staticmethod
def sqlapi(options):
header = {
'Content-Type': 'application/json'
}
data = {
"url": options["url"],
"cookie": options["cookie"],
"getCurrentDb": options["getCurrentDb"],
"getCurrentUser": options["getCurrentUser"],
"getDbs": options["getDbs"],
"getTables": options["getTables"],
"getColumns": options["getColumns"]
}
task_new_url = "http://127.0.0.1:8775/task/new"
res = requests.get(task_new_url)
taskId = res.json()['taskid']
if 'success' in res.content.decode('utf-8'):
task_set_url = 'http://127.0.0.1:8775/option/' + taskId + '/set'
task_set_res = requests.post(task_set_url, data=json.dumps(data), headers=header)
if 'success' in task_set_res.content.decode('utf-8'):
task_start_url = 'http://127.0.0.1:8775/scan/' + taskId + '/start'
task_start_res = requests.post(task_start_url, data=json.dumps(data), headers=header)
if ('success' in task_start_res.content.decode('utf-8')):
while 1:
task_status_url = 'http://127.0.0.1:8775/scan/' + taskId + '/status'
time.sleep(5)
task_status_res = requests.get(task_status_url)
if ('running' in task_status_res.content.decode('utf-8')):
pass
else:
task_data_url = 'http://127.0.0.1:8775/scan/' + taskId + '/data'
task_data_res = requests.get(task_data_url)
return task_data_res.json()
else:
return "sql未安装"
"""
数据库模块
"""
@staticmethod
def get_data_from_db(host, user, pwd, name, pack_dir):
# 生成带时间戳的文件名
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
backup_file = os.path.join(pack_dir, f"{name}_{timestamp}.sql")
try:
# 使用 mysqldump 命令备份
command = [
"mysqldump",
f"--host={host}",
f"--user={user}",
f"--password={pwd}",
name
]
with open(backup_file, "w") as output_file:
process = subprocess.Popen(command, stdout=output_file, stderr=subprocess.PIPE)
_, stderr = process.communicate()
if process.returncode != 0:
raise Exception(f"Backup failed: {stderr.decode()}")
print(f"Backup successful! File saved to: {backup_file}")
except Exception as e:
raise e
@staticmethod
def check_modsecurity_config():
config_paths = [
"/etc/apache2/mods-enabled/security2.conf", # Debian/Ubuntu 路径
"/etc/apache2/mods-available/mod-security.conf", # 可能的自定义配置路径
"/etc/httpd/conf.d/mod_security.conf" # CentOS/RHEL 路径
]
for path in config_paths:
try:
with open(path, "r") as f:
content = f.read()
# 检查关键配置项(如规则加载)
if "Include /etc/modsecurity/*.conf" in content or "SecRuleEngine On" in content:
print(f"[+] ModSecurity 配置文件已加载: {path}")
return True
except FileNotFoundError:
print(f"[-] 配置文件不存在: {path}")
except Exception as e:
print(f"读取配置文件失败: {e}")
print("[-] 未找到有效的 ModSecurity 配置")
return False
@staticmethod
def check_modsecurity_installed():
try:
# 执行 Apache 模块列表命令(根据系统调整命令)
result = subprocess.run(
["apache2ctl", "-M"], # Debian/Ubuntu 使用 apache2ctl
# ["httpd", "-M"], # CentOS/RHEL 使用 httpd
capture_output=True,
text=True,
check=True
)
# 检查输出中是否包含 mod_security
if "security2_module" in result.stdout:
print("[+] ModSecurity 模块已安装")
return True
else:
print("[-] ModSecurity 模块未安装")
return False
except subprocess.CalledProcessError as e:
print(f"检查失败: {e}")
return False
@staticmethod
def check_owasp_crs():
crs_paths = [
"/etc/modsecurity/crs-setup.conf", # OWASP CRS 配置文件
"/etc/modsecurity/ors_rules/rules/REQUEST-942-APPLICATION-ATTACK-SQLI.conf" # SQL 注入规则文件
]
for path in crs_paths:
try:
with open(path, "r") as f:
print(f"[+] OWASP CRS 规则文件存在: {path}")
return True
except FileNotFoundError:
print(f"[-] OWASP CRS 规则文件未找到: {path}")
except Exception as e:
print(f"读取规则文件失败: {e}")
return False
def detect_os_by_file(self):
"""
检测操作系统类型(兼容 Python 3.7)
返回 'debian' 或 'centos',否则抛出异常
"""
# 如果是非 Linux 系统直接报错
if platform.system().lower() != "linux":
raise NotImplementedError("仅支持 Linux 系统")
# 尝试读取 /etc/os-release 文件
os_release_path = "/etc/os-release"
if not os.path.exists(os_release_path):
os_release_path = "/usr/lib/os-release" # 备用路径
try:
with open(os_release_path, "r") as f:
content = f.read()
# 解析键值对(例如 ID=ubuntu)
os_info = {}
for line in content.splitlines():
if "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
os_info[key.strip().lower()] = value.strip().strip('"')
os_id = os_info.get("id", "")
if "debian" in os_id or "ubuntu" in os_id:
return "debian"
elif "centos" in os_id or "rhel" in os_id:
return "centos"
else:
raise ValueError(f"不支持的操作系统: {os_id}")
except Exception as e:
raise RuntimeError(f"无法检测操作系统: {str(e)}")
# 系统检测
def detect_os(self):
os_info = self.detect_os_by_file()
if 'ubuntu' in os_info.lower() or 'debian' in os_info.lower():
return 'debian'
elif 'centos' in os_info.lower() or 'rhel' in os_info.lower():
return 'centos'
else:
print("[-] 不支持的操作系统")
# 检查并安装 ModSecurity
def install_modsecurity(self, os_type):
try:
if os_type == 'debian':
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "libapache2-mod-security2"], check=True)
elif os_type == 'centos':
subprocess.run(["yum", "install", "-y", "mod_security"], check=True)
print("[+] ModSecurity 安装成功")
return True
except subprocess.CalledProcessError as e:
print(f"[-] 安装失败: {e}")
return False
# 配置 ModSecurity
def configure_modsecurity(self, os_type):
config_path = ""
if os_type == 'debian':
config_path = "/etc/apache2/mods-enabled/security2.conf"
elif os_type == 'centos':
config_path = "/etc/httpd/conf.d/mod_security.conf"
# 写入基本配置
config_content = """
SecRuleEngine On
SecAuditLog /var/log/modsec_audit.log
SecDebugLog /var/log/modsec_debug.log
SecDebugLogLevel 0
Include /etc/modsecurity/ors_rules/crs-setup.conf
Include /etc/modsecurity/ors_rules/rules/*.conf
"""
try:
with open(config_path, "w") as f:
f.write(config_content)
print(f"[+] 配置文件已写入: {config_path}")
return True
except Exception as e:
print(f"[-] 写入配置失败: {e}")
return False
# 安装 OWASP CRS 规则集
def install_owasp_crs(self):
crs_dir = "/etc/modsecurity/ors_rules"
crs_repo = "https://github.com/coreruleset/coreruleset"
try:
if os.path.exists(crs_dir):
print(f"检测到已存在的CRS目录 {crs_dir},执行清理...")
shutil.rmtree(crs_dir)
print("旧CRS目录已成功删除")
# 创建新目录
os.makedirs(crs_dir, exist_ok=True)
print(f"创建CRS目录 {crs_dir}")
subprocess.run(["git", "clone", crs_repo, crs_dir], check=True)
subprocess.run(["cp", crs_dir + "/crs-setup.conf.example", crs_dir + "/crs-setup.conf"], check=True)
print("[+] OWASP CRS 规则集安装成功")
return True
except Exception as e:
print(f"[-] 安装 OWASP CRS 失败: {e}")
return False
def detect_main(self):
os_type = self.detect_os()
# 1. 检查 ModSecurity 是否安装
try:
if os_type == 'debian':
result = subprocess.run(["apache2ctl", "-M"], capture_output=True, text=True)
else:
result = subprocess.run(["httpd", "-M"], capture_output=True, text=True)
if "security2_module" not in result.stdout:
print("[-] ModSecurity 未安装,开始安装...")
if not self.install_modsecurity(os_type):
raise Exception("[-] ModSecurity 安装失败")
except FileNotFoundError:
print("[-] Apache 未安装,请先安装 Apache")
# 2. 检查配置文件
if not self.configure_modsecurity(os_type):
print("[-] 未配置")
# 3. 检查 OWASP CRS
crs_rule_file = "/etc/modsecurity/rules/REQUEST-942-APPLICATION-ATTACK-SQLI.conf"
if not os.path.exists(crs_rule_file):
print("[-] OWASP CRS 未安装,开始安装...")
if not self.install_owasp_crs():
print("[-] OWASP CRS 安装失败")
# 重启 Apache
try:
if os_type == 'debian':
subprocess.run(["systemctl", "restart", "apache2"], check=True)
else:
subprocess.run(["systemctl", "restart", "httpd"], check=True)
print("[+] Apache 已重启,配置生效")
return True
except subprocess.CalledProcessError as e:
print(f"[-] 重启 Apache 失败: {e}")
return False
def check_ufw_status(self):
try:
result = subprocess.check_output(
["sudo", "ufw", "status"],
stderr=subprocess.STDOUT,
text=True
)
return "Status: active" in result
except subprocess.CalledProcessError:
return False
except FileNotFoundError:
return None # ufw未安装
def get_ufw_rules(self):
try:
result = subprocess.check_output(
["sudo", "ufw", "status", "verbose"],
stderr=subprocess.STDOUT,
text=True
)
return result
except subprocess.CalledProcessError as e:
return f"错误: {e.output}"
except FileNotFoundError:
return "ufw 未安装"
def block_ip_port(self, ip, port):
try:
cmd = ["sudo", "ufw", "deny", "from", ip, "to", "any", "port", str(port)]
subprocess.check_call(cmd)
return True
except subprocess.CalledProcessError as e:
print(f"失败: {e}")
return False
def open_ufw(self):
try:
cmd = ["sudo", "ufw", "enable"]
subprocess.check_call(cmd)
return True
except subprocess.CalledProcessError as e:
print(f"失败: {e}")
return False
# 精准的Apache组合日志格式正则
LOG_REGEX = re.compile(
r'(?P[\.0-9]+) (?P[\w\d\-]+) (?P[\w\d\-]+) \[(?P.*?)\] "(?P\w+) (?P.*?) (?P[\w\/\.\d]+)" (?P\d+) (?P[\d-]+) "(?P.*?)" "(?P.*?)"')
# 增强的注入检测模式
INJECTION_PATTERNS = re.compile(
r"(\s*([\0\b\'\"\n\r\t\%\_\\]*\s*(((select\s*.+\s*from\s*.+)|(insert\s*.+\s*into\s*.+)|(update\s*.+\s*set\s*.+)|(delete\s*.+\s*from\s*.+)|(drop\s*.+)|(truncate\s*.+)|(alter\s*.+)|(exec\s*.+)|(\s*(all|any|not|and|between|in|like|or|some|contains|containsall|containskey)\s*.+[\=\>\<=\!\~]+.+)|(let\s+.+[\=]\s*.*)|(begin\s*.*\s*end)|(\s*[\/\*]+\s*.*\s*[\*\/]+)|(\s*(\-\-)\s*.*\s+)|(\s*(contains|containsall|containskey)\s+.*)))(\s*[\;]\s*)*)+)"
)
def decode_url(self, encoded_url):
"""处理多重URL编码"""
decoded = unquote(encoded_url)
while '%' in decoded:
new_decoded = unquote(decoded)
if new_decoded == decoded:
break
decoded = new_decoded
return decoded.lower()
def analyze_log(self, log_path):
attack_ips = defaultdict(int)
with open(log_path, 'r', errors='ignore') as f:
for line in f:
match = LOG_REGEX.match(line.strip())
if not match:
continue # 跳过格式错误行
# 提取关键字段
ip = match.group('remote')
method = match.group('method')
url = match.group('url')
# 解码并拼接检测内容
check_str = self.decode_url(url)
if INJECTION_PATTERNS.search(check_str):
attack_ips[ip] += 1
return sorted(attack_ips.items(), key=lambda x: x[1], reverse=True)