🔌插件系统实现

插件系统由PluginManager、MessageManager、Interface插件接口和插件库四部分组成

PluginManager:由加载类、注册类、查找类、运行类、插件线程五个类实现。

MessageManager:由监听线程实现。

Interface插件接口:依赖模块实现的接口留给插件编写者调用,更方便的完成插件编写。

插件库:插件开发者实现的一系列插件,该系统有一定的插件编写规范。

PART 1:PluginManager插件管理器

目的:实现多个插件的同时运行,插件的加载,插件的注册,插件信息的展示

  • 根据用户选择,加载对应插件

  • 列出对应插件参数

  • 多线程同时运行多个插件

  • 展示插件运行结果

  • 消息队列处理多个插件返回结果

  • 自行导入插件,同时安装插件依赖python库

加载类功能:

check函数用于遍历plugins文件夹中所有插件,判断是否所有插件都在Plugins.json中注册,如果未注册就交给注册类注册

# 加载类
class Load:
    def __init__(self,Register) -> None:
        self.Register=Register
        pass
        
    # 检测插件
    def check(self):
        # 读取 Plugins.json 文件
        with open("myproject\PluginSystem\Plugins.json", 'r') as f:
            registered_plugins = [i["name"] for i in json.load(f)['PluginList']]
            # print(registered_plugins)

        # 扫描子文件夹 Plugins 下的所有文件夹,找到其中的插件
        plugins = []
        for folder_name in os.listdir('myproject\PluginSystem\Plugins'):
    
            folder_path = os.path.join('myproject\PluginSystem\Plugins', folder_name)
            if os.path.isdir(folder_path) and os.path.isfile(os.path.join(folder_path, 'info.json')):
                plugins.append(folder_name)
        # print(plugins)

        # 检查每个插件是否都已经在 Plugins.json 中注册,为注册则注册该插件
        for plugin in plugins:
            if plugin not in registered_plugins:
                # print(f'Error: Plugin {plugin} is not registered in Plugins.json')
                self.Register.registe(plugin)
        print("所有插件加载完成")

PART 2:MessageManager插件消息管理器

目的:实现一个监听线程处理插件返回的消息

  • 处理插件返回的消息

  • 处理插件返回的日志

  • 储存插件返回的结果

监听多线程消息的实现原理:插件接口会接收来自各个插件返回的消息交给一个队列,开辟的监听线程会实时监听该队列,并且解析从队列中读取的消息,分类交给Recorder库将消息存储起来

import threading
import queue
from time import sleep
import os
import sys
# 获取上一级目录路径
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from Recorder.Recorder import Recorder

TEST="TEST"

# 结束事件
all_plugins_end=threading.Event()
# 消息队列
message_queue=queue.Queue()

# message格式如下:
#     a={
#         "name":"PluginName",
#         "type":"log",
#         "level":"info",
#         "messages":[]
#     }
# 处理消息
def process_message(message):
    # print(message)
    if message.get("type","") == "log":
        message_recorder.log(message)
    elif message.get("type","") == "result":
        message_recorder.result(message)
    else:
        print("[ERROR]未知消息类型:",message)
        return

# 监听器——实现监听
def message_listener():
    print("[+]监听创建成功")
    while True:
        # print(all_plugins_end.is_set(),message_queue.empty())
        # 判断是否所有插件线程都结束了
        if all_plugins_end.is_set() and message_queue.empty():
                break
        try:
            message = message_queue.get(timeout=1)
            # 处理消息
            if message is None:
                continue
            process_message(message)
            # 处理完成
            message_queue.task_done()

        except queue.Empty:
            sleep(2)
            pass

# 创建监听接收一个线程列表
def Build_Listener(ThreadList,pluginlist,project_name):
    # print(ThreadList)
    global message_recorder
    message_recorder=Recorder(project_name)
    message_recorder.creat_dir(pluginlist)
    try:
        # 创建并启动消息监听器线程
        listener_thread = threading.Thread(target=message_listener)
        listener_thread.daemon = True  # 设置为守护线程,主线程结束时自动退出
        listener_thread.start()
    except Exception as e:
        print("[ERROR]监听器创建失败:\n\t",e)

    try:
        # 等待所有线程执行完成
        for i in ThreadList[0]:
            i.join()
        # 删除所有线程
        for i in ThreadList[0]:
            ThreadList[0].remove(i)
        # 设置事件,表示所有插件线程都结束了
        all_plugins_end.set()
    except Exception as e:
            print("[ERROR]监听线程错误:\n\t",e)
    try:
        # 等待消息队列中的消息都被处理完
        message_queue.join()
        print("[+]消息处理完成")
        # 等待消息监听器线程结束
        # listener_thread.join()
        print("[+]所有线程运行完成,监听结束")
    except Exception as e:
        print("[ERROR]消息队列监听错误:\n\t",e)

    return 

# 插件接口函数,插件调用返回message
def Put_message(message):
    # print(message)
    message_queue.put(message)

PART 3:plugins.Interface插件接口

  • Interface.Trans_message(message)

向插件系统传递消息,插件系统会根据message处理消息

# 传递消息
def Trans_message(message):
    mg.Put_message(message)
  • Get_HTMLExtractor(HTML="")

获取一个HTML解析器,HTML提取器将会解析html的input输入框,form表单,textarea多行输入框,接口使用实例

# 获取一个html解析类对象
def Get_HTMLExtractor(HTML=""):
    Extractor = EH.HTMLExtractor(HTML)
    return Extractor
  • Get_SelfDefExtractor(setting={})

获得一个自定义提取器,提取指定的html元素

# 获取一个自定义提取类对象
def Get_SelfDefExtractor(setting):
    Extractor = ES.Self_Defining_Extractor(setting)
    return Extractor
  • request_form(args)

针对使用HTMLExtractor.extract_form()提取器提取的表单结果,发送请求的一个接口,返回一个响应

接收参数详解:

"""
request_form(args)参数:
    args={
        "form":{'form_name': 'form1', 'form_method': 'post', 'form_action': ' ', 'input_datas': [{'name': 'uname', 'type': 'text', 'id': None}, {'name': 'passwd', 'type': 'text', 'id': None}, {'name': 'submit', 'type': 'submit', 'id': None}]},
        "payload":"",
        "url":"",
        "cookies":{},
        "proxies":{},
        "random_ua":True,
    }
"""

具体实现:

# 针对使用form提取器提取结果进行请求,返回一个请求
def request_form(args):
    # 设置UA头
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
    }
    # print("request_form:",args)
    form = args.get("form", None)
    # 如果没有参数则返回
    if form == None:
        print("[ERROR]request_get:form is None")
        return None
    # 判断是否随机ua
    if args.get("random_ua", True):
        ua = UserAgent()
        headers["User-Agent"] = ua.random
    # print("注入form表单")
    params = {}
    method = form.get("form_method", None)
    # 解析form构造payload
    def get_payload():
        payload=args.get("payload", None)
        # print(payload)
        # url编码
        # payload= urllib.parse.urlencode(payload)
        # print(payload)
        name = form.get("name", "id")
        for i in form.get("input_datas", []):
            if  i.get("name").lower() == "submit" :
                # 注意后续变动
                params["submit"] = "Submit"
                continue
            name=i.get("name", "id")
            params[name] = payload
        return
    get_payload()
    # print("request_form:",params)

    # 发送get请求
    if method == "get":
        # print("注入form表单,发送get请求")
        def Get_method():
            # print(params)
            # print("\n发送post请求",form.get("cookies", None))
            try:
                req = requests.get(
                    allow_redirects=False,
                    url=args.get("url", "127.0.0.1"),
                    params=params,
                    headers=headers,
                    cookies=form.get("cookies", None),
                    proxies=form.get("proxies", None),
                    timeout=5,
                )
                return req
            except Exception as e:
                pass
            
        return Get_method()
    # 发送post请求
    elif method == "post":
        # print("注入form表单,发送Post请求")
        def Post_method():
            # print(params)
            # print("\n发送post请求",form.get("cookies", None))
            try:
                # 修改
                req = requests.post(
                    allow_redirects=False,
                    url=args.get("url", "127.0.0.1"),
                    data=params,
                    headers=headers,
                    cookies=form.get("cookies", None),
                    proxies=form.get("proxies", None),
                    timeout=5,
                )
                # print(req.status_code)
                return req
            except Exception as e:
                pass
            
        return Post_method()
    else:
        # print("[Error]注入form表单失败method错误")
        return
  • request_input(args)

针对使用HTMLExtractor.extract_input()提取器提取的input输入框结果,发送请求的一个接口,返回一个响应

"""
request_input(args)参数:
    args={
        "input":{'name': 'passwd', 'type': 'text', 'value': '', 'form': 'form1', 'method': 'post'},
        "payload":"",
        "url":"",
        "cookies":{},
        "proxies":{},
        "random_ua":True,
    }
"""
# 针对使用input提取器提取结果进行请求,返回一个请求
def request_input(args):
    # 设置UA头
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
    }
    input = args.get("input", None)
    # print("input:",input)
    # 如果没有参数则返回
    if input == None:
        print("[ERROR]request_get:input is None")
        return None
    # 如果input属于一个表单则不进行请求
    if input.get("form", None) is not None:
        # print("input输入框属于一个表单不进行注入")
        return None
    # 设置随机ua头
    if args.get("random_ua", True):
        ua = UserAgent()
        # print(ua.random)
        headers["User-Agent"] = ua.random

    # 获取input输入框的变量名name和请求方法method
    name = input.get("name", "id")
    method = input.get("method", None)
    # 发送get请求
    if method == "get":
        def Get_method():
            params = {}
            params[name] = args.get("payload", None)
            # print(params)
            req = requests.get(
                url=args.get("url", "127.0.0.1"),
                params=params,
                headers=headers,
                cookies=args.get("cookies", None),
                proxies=args.get("proxies", None),
                timeout=(10, None),
                allow_redirects=False,
            )
            # print(req)
            return req
        return Get_method()
    # 发起post请求
    elif method == "post":
        def Post_method():
            params = {}
            params[name] = args.get("payload", None)
            # print(params)
            req = requests.post(
                allow_redirects=False,
                url=args.get("url", "127.0.0.1"),
                data=params,
                headers=headers,
                cookies=input.get("cookies", None),
                proxies=input.get("proxies", None),
                timeout=5,
            )
            # print(req.request)
            # print(req)
            return req
        return Post_method()
    else:
        return
  • Get_Storage()

获得一个Storage对象用于向对数据库的操作:1.创建表单 2.向指定表单插入数据

# 获取一个存储类对象
def Get_Storage():
    StoreObj = storage.Storage_Base()
    return StoreObj
  • GET_Recorder(project_name)

# 获取一个记录器对象
def GET_Recorder(project_name):
    if project_name == None:
        print("[ERROR]GET_Recorder:project_name is None")
        return None
    RecorderObj = Recorder(project_name)
    return RecorderObj
  • Get_file(plugin,filename,type)

获得该插件的工作路径下的文件操作

# 获得插件工作地址的文件
def Get_file(plugin,filename,type):
    file_dir=os.path.join(os.path.join(os.path.join(os.path.join(grandfather_dir,"History"),plugin.project_name),plugin.plugin_name),filename)
    file = open(file_dir, type)
    return file
  • Get_a_file(file_path,type)

获得一个文件:

# 获得一个文件
def Get_a_file(file_path,type):
    try:
        file = open(file_path, type)
        return file
    except Exception as e:
        print(e)
        pass
  • Get_url_from_scrapy(project_name)

从scrapy扫描结果载入url文件

def Get_url_from_scrapy(project_name):
    file_dir=os.path.join(os.path.join(os.path.join(grandfather_dir,"History"),project_name),"urls.txt")
    file = open(file_dir, "r")
    return file

最后更新于