插件系统实现
插件系统由PluginManager、MessageManager、Interface插件接口和插件库四部分组成
PluginManager:由加载类、注册类、查找类、运行类、插件线程五个类实现。
MessageManager:由监听线程实现。
Interface插件接口:依赖模块实现的接口留给插件编写者调用,更方便的完成插件编写。
插件库:插件开发者实现的一系列插件,该系统有一定的插件编写规范。
PART 1:PluginManager插件管理器
目的:实现多个插件的同时运行,插件的加载,插件的注册,插件信息的展示
根据用户选择,加载对应插件
列出对应插件参数
多线程同时运行多个插件
展示插件运行结果
消息队列处理多个插件返回结果
自行导入插件,同时安装插件依赖python库
加载类功能:
check函数用于遍历plugins文件夹中所有插件,判断是否所有插件都在Plugins.json中注册,如果未注册就交给注册类注册
# 加载类
class Load:
def __init__(self,Register) -> None:
self.Register=Register
pass
# 检测插件
def check(self):
# 读取 Plugins.json 文件
with open("myproject\PluginSystem\Plugins.json", 'r') as f:
registered_plugins = [i["name"] for i in json.load(f)['PluginList']]
# print(registered_plugins)
# 扫描子文件夹 Plugins 下的所有文件夹,找到其中的插件
plugins = []
for folder_name in os.listdir('myproject\PluginSystem\Plugins'):
folder_path = os.path.join('myproject\PluginSystem\Plugins', folder_name)
if os.path.isdir(folder_path) and os.path.isfile(os.path.join(folder_path, 'info.json')):
plugins.append(folder_name)
# print(plugins)
# 检查每个插件是否都已经在 Plugins.json 中注册,为注册则注册该插件
for plugin in plugins:
if plugin not in registered_plugins:
# print(f'Error: Plugin {plugin} is not registered in Plugins.json')
self.Register.registe(plugin)
print("所有插件加载完成")
注册类功能:
registe(self, plugin_name):
函数接收插件名PluginName将对应的插件注册到Plugins.json中__add_PluginInfo(self,args={}):添加plugin信息
__find_info(self,PluginName):
找到plugin的信息,返回一个字典,字典中包含了插件的信息,例如名字,版本,状态等等。
__pip_install_requirements(self,file_path):
安装依赖库
is_in_pluginlist(self,plugin_name,plugin_version):
判断是否注册过该插件
# 注册类
class Register:
# 添加plugin信息
def __add_PluginInfo(self,args={}):
dic={}
try:
with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
dic["PluginList"].append(args)
# print(json.dumps(dic, indent=4, ensure_ascii=False))
with open('myproject\PluginSystem\Plugins.json', 'w', encoding='utf-8') as f:
json.dump(dic, f, ensure_ascii=False, indent=4)
except Exception as e:
print(f"添加plugin信息错误,模块位置:{__file__},错误位置:PluginManager.Register.__add_PluginInfo(self,args)\n\t"+"错误信息:"+e)
# 找到plugin的信息,返回一个字典,字典中包含了插件的信息,例如名字,版本,状态等等。
def __find_info(self,PluginName):
dic={}
with open(f'myproject\PluginSystem\Plugins\{PluginName}\info.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
# print(dic)
return dic
# 注册plugin,将插件添加到pluginlist中。
def registe(self, plugin_name):
info=self.__find_info(plugin_name)
version=info["version"]
if(self.is_in_pluginlist(plugin_name,version)):
print(f"已经注册过插件{plugin_name}")
# 抛出message
return
print(f"{plugin_name}注册中...")
self.__add_PluginInfo(info)
self.__pip_install_requirements(f'myproject\PluginSystem\Plugins\{plugin_name}\\requirements.txt')
# 安装依赖库
def __pip_install_requirements(self,file_path):
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", file_path, "-i", SETTING.PIP_SOURCE])
# 判断是否注册过该插件
def is_in_pluginlist(self,plugin_name,plugin_version):
dic={}
with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
# print(dic)
for i in dic["PluginList"]:
if i["name"]==plugin_name:
if i["version"]==plugin_version:
return True
return False
List_all_info(self):
列出所有的plugin
List_all_name_version(self):
列出所有plugin的插件名和版本号
List_index_info(self,namelist):
列出指定namelist的plugin的info,namelist是一个列表,返回一个info列表
# 查找类,用于列出插件,及其相关信息,在使用时用p.List.List_index_info(["PluginB","asd","sad"])
class List:
def __init__(self) -> None:
pass
# 列出所有的plugin
def List_all_info(self):
dic={}
with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
# print(dic)
return dic
# 列出所有plugin的插件名和版本号
def List_all_name_version(self):
dic={}
dic_name_version=[]
with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
for i in dic["PluginList"]:
item={}
item["name"]=i["name"]
item["version"]=i["version"]
dic_name_version.append(item)
# print(dic_name_version)
return dic_name_version
# 列出指定namelist的plugin的info,namelist是一个列表,返回一个info列表
def List_index_info(self,namelist):
Info_List=[]
# print(List)
with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
for d in dic["PluginList"]:
if d["name"] in namelist:
Info_List.append(d)
# print(Info_List)
return Info_List
run_plugins(self,ArgsList=None):
解析所有表单信息,根据指定参数运行对应插件
__Generator_PluginObj(self,name=None):
根据表单解析后的字典生成一个插件类(私有函数)
show_result(self):
展示运行结果调用插件的show_result()
# 运行类
class Run:
def __init__(self,project_name):
self.plugin_threadings=[]
self.project_name=project_name
self.plugin_obj=[]
# 解析所有表单信息,根据指定参数运行对应插件
def run_plugins(self,ArgsList=None):
if not ArgsList:
return
for i in ArgsList:
name=i["name"]
# 添加项目名称
i["project_name"]=self.project_name
# 创建插件对象
obj=self.__Generator_PluginObj(name)
if obj is None:
continue
threading=PluginThread(obj, i)
self.plugin_threadings.append(threading)
threading.start()
# 创建监听监听每一个插件返回的消息
# print("run_plugins:",[self.plugin_threadings])
Build_Listener(ThreadList=[self.plugin_threadings],pluginlist=ArgsList,project_name=self.project_name)
print("[+]所有插件运行结束")
# 根据表单解析后的字典生成一个插件类(私有函数)
def __Generator_PluginObj(self,name=None):
if not name:
return
# 解析出插件名
try:
plugin_module = importlib.import_module(f"Plugins.{name}.{name}")
# print(dir(plugin_module))
plugin_class = getattr(plugin_module,name)
plugin_obj=plugin_class()
# 加入对象列表
self.plugin_obj.append(plugin_obj)
return plugin_obj
except:
print(f"[ERROR]插件{name}创建错误,原因可能是{name}不存在,请检测是否存在Plugins/{name}.py下是否有以{name}命名的class,详情请查插件编写规范")
return None
def show_result(self):
for i in self.plugin_obj:
try:
i.show_result()
except:
pass
根据接收的插件对象,和运行参数创建一个线程运行该插件
class PluginThread(threading.Thread):
def __init__(self, plugin, args):
threading.Thread.__init__(self)
self.plugin = plugin
self.args=args
def run(self):
try:
self.plugin.run(self.args)
except:
print(f"插件{self.plugin.name}运行出错")
return
PART 2:MessageManager插件消息管理器
目的:实现一个监听线程处理插件返回的消息
处理插件返回的消息
处理插件返回的日志
储存插件返回的结果
监听多线程消息的实现原理:插件接口会接收来自各个插件返回的消息交给一个队列,开辟的监听线程会实时监听该队列,并且解析从队列中读取的消息,分类交给Recorder库将消息存储起来
import threading
import queue
from time import sleep
import os
import sys
# 获取上一级目录路径
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from Recorder.Recorder import Recorder
TEST="TEST"
# 结束事件
all_plugins_end=threading.Event()
# 消息队列
message_queue=queue.Queue()
# message格式如下:
# a={
# "name":"PluginName",
# "type":"log",
# "level":"info",
# "messages":[]
# }
# 处理消息
def process_message(message):
# print(message)
if message.get("type","") == "log":
message_recorder.log(message)
elif message.get("type","") == "result":
message_recorder.result(message)
else:
print("[ERROR]未知消息类型:",message)
return
# 监听器——实现监听
def message_listener():
print("[+]监听创建成功")
while True:
# print(all_plugins_end.is_set(),message_queue.empty())
# 判断是否所有插件线程都结束了
if all_plugins_end.is_set() and message_queue.empty():
break
try:
message = message_queue.get(timeout=1)
# 处理消息
if message is None:
continue
process_message(message)
# 处理完成
message_queue.task_done()
except queue.Empty:
sleep(2)
pass
# 创建监听接收一个线程列表
def Build_Listener(ThreadList,pluginlist,project_name):
# print(ThreadList)
global message_recorder
message_recorder=Recorder(project_name)
message_recorder.creat_dir(pluginlist)
try:
# 创建并启动消息监听器线程
listener_thread = threading.Thread(target=message_listener)
listener_thread.daemon = True # 设置为守护线程,主线程结束时自动退出
listener_thread.start()
except Exception as e:
print("[ERROR]监听器创建失败:\n\t",e)
try:
# 等待所有线程执行完成
for i in ThreadList[0]:
i.join()
# 删除所有线程
for i in ThreadList[0]:
ThreadList[0].remove(i)
# 设置事件,表示所有插件线程都结束了
all_plugins_end.set()
except Exception as e:
print("[ERROR]监听线程错误:\n\t",e)
try:
# 等待消息队列中的消息都被处理完
message_queue.join()
print("[+]消息处理完成")
# 等待消息监听器线程结束
# listener_thread.join()
print("[+]所有线程运行完成,监听结束")
except Exception as e:
print("[ERROR]消息队列监听错误:\n\t",e)
return
# 插件接口函数,插件调用返回message
def Put_message(message):
# print(message)
message_queue.put(message)
PART 3:plugins.Interface插件接口
Interface.Trans_message(message)
向插件系统传递消息,插件系统会根据message处理消息
# 传递消息
def Trans_message(message):
mg.Put_message(message)
Get_HTMLExtractor(HTML="")
获取一个HTML解析器,HTML提取器将会解析html的input输入框,form表单,textarea多行输入框,接口使用实例
# 获取一个html解析类对象
def Get_HTMLExtractor(HTML=""):
Extractor = EH.HTMLExtractor(HTML)
return Extractor
Get_SelfDefExtractor(setting={})
获得一个自定义提取器,提取指定的html元素
# 获取一个自定义提取类对象
def Get_SelfDefExtractor(setting):
Extractor = ES.Self_Defining_Extractor(setting)
return Extractor
request_form(args)
针对使用HTMLExtractor.extract_form()
提取器提取的表单结果,发送请求的一个接口,返回一个响应
接收参数详解:
"""
request_form(args)参数:
args={
"form":{'form_name': 'form1', 'form_method': 'post', 'form_action': ' ', 'input_datas': [{'name': 'uname', 'type': 'text', 'id': None}, {'name': 'passwd', 'type': 'text', 'id': None}, {'name': 'submit', 'type': 'submit', 'id': None}]},
"payload":"",
"url":"",
"cookies":{},
"proxies":{},
"random_ua":True,
}
"""
具体实现:
# 针对使用form提取器提取结果进行请求,返回一个请求
def request_form(args):
# 设置UA头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
}
# print("request_form:",args)
form = args.get("form", None)
# 如果没有参数则返回
if form == None:
print("[ERROR]request_get:form is None")
return None
# 判断是否随机ua
if args.get("random_ua", True):
ua = UserAgent()
headers["User-Agent"] = ua.random
# print("注入form表单")
params = {}
method = form.get("form_method", None)
# 解析form构造payload
def get_payload():
payload=args.get("payload", None)
# print(payload)
# url编码
# payload= urllib.parse.urlencode(payload)
# print(payload)
name = form.get("name", "id")
for i in form.get("input_datas", []):
if i.get("name").lower() == "submit" :
# 注意后续变动
params["submit"] = "Submit"
continue
name=i.get("name", "id")
params[name] = payload
return
get_payload()
# print("request_form:",params)
# 发送get请求
if method == "get":
# print("注入form表单,发送get请求")
def Get_method():
# print(params)
# print("\n发送post请求",form.get("cookies", None))
try:
req = requests.get(
allow_redirects=False,
url=args.get("url", "127.0.0.1"),
params=params,
headers=headers,
cookies=form.get("cookies", None),
proxies=form.get("proxies", None),
timeout=5,
)
return req
except Exception as e:
pass
return Get_method()
# 发送post请求
elif method == "post":
# print("注入form表单,发送Post请求")
def Post_method():
# print(params)
# print("\n发送post请求",form.get("cookies", None))
try:
# 修改
req = requests.post(
allow_redirects=False,
url=args.get("url", "127.0.0.1"),
data=params,
headers=headers,
cookies=form.get("cookies", None),
proxies=form.get("proxies", None),
timeout=5,
)
# print(req.status_code)
return req
except Exception as e:
pass
return Post_method()
else:
# print("[Error]注入form表单失败method错误")
return
request_input(args)
针对使用HTMLExtractor.extract_input()
提取器提取的input输入框结果,发送请求的一个接口,返回一个响应
"""
request_input(args)参数:
args={
"input":{'name': 'passwd', 'type': 'text', 'value': '', 'form': 'form1', 'method': 'post'},
"payload":"",
"url":"",
"cookies":{},
"proxies":{},
"random_ua":True,
}
"""
# 针对使用input提取器提取结果进行请求,返回一个请求
def request_input(args):
# 设置UA头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
}
input = args.get("input", None)
# print("input:",input)
# 如果没有参数则返回
if input == None:
print("[ERROR]request_get:input is None")
return None
# 如果input属于一个表单则不进行请求
if input.get("form", None) is not None:
# print("input输入框属于一个表单不进行注入")
return None
# 设置随机ua头
if args.get("random_ua", True):
ua = UserAgent()
# print(ua.random)
headers["User-Agent"] = ua.random
# 获取input输入框的变量名name和请求方法method
name = input.get("name", "id")
method = input.get("method", None)
# 发送get请求
if method == "get":
def Get_method():
params = {}
params[name] = args.get("payload", None)
# print(params)
req = requests.get(
url=args.get("url", "127.0.0.1"),
params=params,
headers=headers,
cookies=args.get("cookies", None),
proxies=args.get("proxies", None),
timeout=(10, None),
allow_redirects=False,
)
# print(req)
return req
return Get_method()
# 发起post请求
elif method == "post":
def Post_method():
params = {}
params[name] = args.get("payload", None)
# print(params)
req = requests.post(
allow_redirects=False,
url=args.get("url", "127.0.0.1"),
data=params,
headers=headers,
cookies=input.get("cookies", None),
proxies=input.get("proxies", None),
timeout=5,
)
# print(req.request)
# print(req)
return req
return Post_method()
else:
return
Get_Storage()
获得一个Storage对象用于向对数据库的操作:1.创建表单 2.向指定表单插入数据
# 获取一个存储类对象
def Get_Storage():
StoreObj = storage.Storage_Base()
return StoreObj
GET_Recorder(project_name)
# 获取一个记录器对象
def GET_Recorder(project_name):
if project_name == None:
print("[ERROR]GET_Recorder:project_name is None")
return None
RecorderObj = Recorder(project_name)
return RecorderObj
Get_file(plugin,filename,type)
获得该插件的工作路径下的文件操作
# 获得插件工作地址的文件
def Get_file(plugin,filename,type):
file_dir=os.path.join(os.path.join(os.path.join(os.path.join(grandfather_dir,"History"),plugin.project_name),plugin.plugin_name),filename)
file = open(file_dir, type)
return file
Get_a_file(file_path,type)
获得一个文件:
# 获得一个文件
def Get_a_file(file_path,type):
try:
file = open(file_path, type)
return file
except Exception as e:
print(e)
pass
Get_url_from_scrapy(project_name)
从scrapy扫描结果载入url文件
def Get_url_from_scrapy(project_name):
file_dir=os.path.join(os.path.join(os.path.join(grandfather_dir,"History"),project_name),"urls.txt")
file = open(file_dir, "r")
return file
最后更新于