🔌插件系统实现
PART 1:PluginManager插件管理器
目的:实现多个插件的同时运行,插件的加载,插件的注册,插件信息的展示
根据用户选择,加载对应插件
列出对应插件参数
多线程同时运行多个插件
展示插件运行结果
消息队列处理多个插件返回结果
自行导入插件,同时安装插件依赖python库
加载类功能:
check函数用于遍历plugins文件夹中所有插件,判断是否所有插件都在Plugins.json中注册,如果未注册就交给注册类注册
# 加载类
class Load:
def __init__(self,Register) -> None:
self.Register=Register
pass
# 检测插件
def check(self):
# 读取 Plugins.json 文件
with open("myproject\PluginSystem\Plugins.json", 'r') as f:
registered_plugins = [i["name"] for i in json.load(f)['PluginList']]
# print(registered_plugins)
# 扫描子文件夹 Plugins 下的所有文件夹,找到其中的插件
plugins = []
for folder_name in os.listdir('myproject\PluginSystem\Plugins'):
folder_path = os.path.join('myproject\PluginSystem\Plugins', folder_name)
if os.path.isdir(folder_path) and os.path.isfile(os.path.join(folder_path, 'info.json')):
plugins.append(folder_name)
# print(plugins)
# 检查每个插件是否都已经在 Plugins.json 中注册,为注册则注册该插件
for plugin in plugins:
if plugin not in registered_plugins:
# print(f'Error: Plugin {plugin} is not registered in Plugins.json')
self.Register.registe(plugin)
print("所有插件加载完成")
PART 2:MessageManager插件消息管理器
目的:实现一个监听线程处理插件返回的消息
处理插件返回的消息
处理插件返回的日志
储存插件返回的结果
监听多线程消息的实现原理:插件接口会接收来自各个插件返回的消息交给一个队列,开辟的监听线程会实时监听该队列,并且解析从队列中读取的消息,分类交给Recorder库将消息存储起来
import threading
import queue
from time import sleep
import os
import sys
# 获取上一级目录路径
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from Recorder.Recorder import Recorder
TEST="TEST"
# 结束事件
all_plugins_end=threading.Event()
# 消息队列
message_queue=queue.Queue()
# message格式如下:
# a={
# "name":"PluginName",
# "type":"log",
# "level":"info",
# "messages":[]
# }
# 处理消息
def process_message(message):
# print(message)
if message.get("type","") == "log":
message_recorder.log(message)
elif message.get("type","") == "result":
message_recorder.result(message)
else:
print("[ERROR]未知消息类型:",message)
return
# 监听器——实现监听
def message_listener():
print("[+]监听创建成功")
while True:
# print(all_plugins_end.is_set(),message_queue.empty())
# 判断是否所有插件线程都结束了
if all_plugins_end.is_set() and message_queue.empty():
break
try:
message = message_queue.get(timeout=1)
# 处理消息
if message is None:
continue
process_message(message)
# 处理完成
message_queue.task_done()
except queue.Empty:
sleep(2)
pass
# 创建监听接收一个线程列表
def Build_Listener(ThreadList,pluginlist,project_name):
# print(ThreadList)
global message_recorder
message_recorder=Recorder(project_name)
message_recorder.creat_dir(pluginlist)
try:
# 创建并启动消息监听器线程
listener_thread = threading.Thread(target=message_listener)
listener_thread.daemon = True # 设置为守护线程,主线程结束时自动退出
listener_thread.start()
except Exception as e:
print("[ERROR]监听器创建失败:\n\t",e)
try:
# 等待所有线程执行完成
for i in ThreadList[0]:
i.join()
# 删除所有线程
for i in ThreadList[0]:
ThreadList[0].remove(i)
# 设置事件,表示所有插件线程都结束了
all_plugins_end.set()
except Exception as e:
print("[ERROR]监听线程错误:\n\t",e)
try:
# 等待消息队列中的消息都被处理完
message_queue.join()
print("[+]消息处理完成")
# 等待消息监听器线程结束
# listener_thread.join()
print("[+]所有线程运行完成,监听结束")
except Exception as e:
print("[ERROR]消息队列监听错误:\n\t",e)
return
# 插件接口函数,插件调用返回message
def Put_message(message):
# print(message)
message_queue.put(message)
PART 3:plugins.Interface插件接口
Interface.Trans_message(message)
向插件系统传递消息,插件系统会根据message处理消息
# 传递消息
def Trans_message(message):
mg.Put_message(message)
Get_HTMLExtractor(HTML="")
获取一个HTML解析器,HTML提取器将会解析html的input输入框,form表单,textarea多行输入框,接口使用实例
# 获取一个html解析类对象
def Get_HTMLExtractor(HTML=""):
Extractor = EH.HTMLExtractor(HTML)
return Extractor
Get_SelfDefExtractor(setting={})
获得一个自定义提取器,提取指定的html元素
# 获取一个自定义提取类对象
def Get_SelfDefExtractor(setting):
Extractor = ES.Self_Defining_Extractor(setting)
return Extractor
request_form(args)
针对使用HTMLExtractor.extract_form()
提取器提取的表单结果,发送请求的一个接口,返回一个响应
接收参数详解:
"""
request_form(args)参数:
args={
"form":{'form_name': 'form1', 'form_method': 'post', 'form_action': ' ', 'input_datas': [{'name': 'uname', 'type': 'text', 'id': None}, {'name': 'passwd', 'type': 'text', 'id': None}, {'name': 'submit', 'type': 'submit', 'id': None}]},
"payload":"",
"url":"",
"cookies":{},
"proxies":{},
"random_ua":True,
}
"""
具体实现:
# 针对使用form提取器提取结果进行请求,返回一个请求
def request_form(args):
# 设置UA头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
}
# print("request_form:",args)
form = args.get("form", None)
# 如果没有参数则返回
if form == None:
print("[ERROR]request_get:form is None")
return None
# 判断是否随机ua
if args.get("random_ua", True):
ua = UserAgent()
headers["User-Agent"] = ua.random
# print("注入form表单")
params = {}
method = form.get("form_method", None)
# 解析form构造payload
def get_payload():
payload=args.get("payload", None)
# print(payload)
# url编码
# payload= urllib.parse.urlencode(payload)
# print(payload)
name = form.get("name", "id")
for i in form.get("input_datas", []):
if i.get("name").lower() == "submit" :
# 注意后续变动
params["submit"] = "Submit"
continue
name=i.get("name", "id")
params[name] = payload
return
get_payload()
# print("request_form:",params)
# 发送get请求
if method == "get":
# print("注入form表单,发送get请求")
def Get_method():
# print(params)
# print("\n发送post请求",form.get("cookies", None))
try:
req = requests.get(
allow_redirects=False,
url=args.get("url", "127.0.0.1"),
params=params,
headers=headers,
cookies=form.get("cookies", None),
proxies=form.get("proxies", None),
timeout=5,
)
return req
except Exception as e:
pass
return Get_method()
# 发送post请求
elif method == "post":
# print("注入form表单,发送Post请求")
def Post_method():
# print(params)
# print("\n发送post请求",form.get("cookies", None))
try:
# 修改
req = requests.post(
allow_redirects=False,
url=args.get("url", "127.0.0.1"),
data=params,
headers=headers,
cookies=form.get("cookies", None),
proxies=form.get("proxies", None),
timeout=5,
)
# print(req.status_code)
return req
except Exception as e:
pass
return Post_method()
else:
# print("[Error]注入form表单失败method错误")
return
request_input(args)
针对使用HTMLExtractor.extract_input()
提取器提取的input输入框结果,发送请求的一个接口,返回一个响应
"""
request_input(args)参数:
args={
"input":{'name': 'passwd', 'type': 'text', 'value': '', 'form': 'form1', 'method': 'post'},
"payload":"",
"url":"",
"cookies":{},
"proxies":{},
"random_ua":True,
}
"""
# 针对使用input提取器提取结果进行请求,返回一个请求
def request_input(args):
# 设置UA头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
}
input = args.get("input", None)
# print("input:",input)
# 如果没有参数则返回
if input == None:
print("[ERROR]request_get:input is None")
return None
# 如果input属于一个表单则不进行请求
if input.get("form", None) is not None:
# print("input输入框属于一个表单不进行注入")
return None
# 设置随机ua头
if args.get("random_ua", True):
ua = UserAgent()
# print(ua.random)
headers["User-Agent"] = ua.random
# 获取input输入框的变量名name和请求方法method
name = input.get("name", "id")
method = input.get("method", None)
# 发送get请求
if method == "get":
def Get_method():
params = {}
params[name] = args.get("payload", None)
# print(params)
req = requests.get(
url=args.get("url", "127.0.0.1"),
params=params,
headers=headers,
cookies=args.get("cookies", None),
proxies=args.get("proxies", None),
timeout=(10, None),
allow_redirects=False,
)
# print(req)
return req
return Get_method()
# 发起post请求
elif method == "post":
def Post_method():
params = {}
params[name] = args.get("payload", None)
# print(params)
req = requests.post(
allow_redirects=False,
url=args.get("url", "127.0.0.1"),
data=params,
headers=headers,
cookies=input.get("cookies", None),
proxies=input.get("proxies", None),
timeout=5,
)
# print(req.request)
# print(req)
return req
return Post_method()
else:
return
Get_Storage()
获得一个Storage对象用于向对数据库的操作:1.创建表单 2.向指定表单插入数据
# 获取一个存储类对象
def Get_Storage():
StoreObj = storage.Storage_Base()
return StoreObj
GET_Recorder(project_name)
# 获取一个记录器对象
def GET_Recorder(project_name):
if project_name == None:
print("[ERROR]GET_Recorder:project_name is None")
return None
RecorderObj = Recorder(project_name)
return RecorderObj
Get_file(plugin,filename,type)
获得该插件的工作路径下的文件操作
# 获得插件工作地址的文件
def Get_file(plugin,filename,type):
file_dir=os.path.join(os.path.join(os.path.join(os.path.join(grandfather_dir,"History"),plugin.project_name),plugin.plugin_name),filename)
file = open(file_dir, type)
return file
Get_a_file(file_path,type)
获得一个文件:
# 获得一个文件
def Get_a_file(file_path,type):
try:
file = open(file_path, type)
return file
except Exception as e:
print(e)
pass
Get_url_from_scrapy(project_name)
从scrapy扫描结果载入url文件
def Get_url_from_scrapy(project_name):
file_dir=os.path.join(os.path.join(os.path.join(grandfather_dir,"History"),project_name),"urls.txt")
file = open(file_dir, "r")
return file
最后更新于
这有帮助吗?