# 插件系统实现

{% hint style="info" %}
插件系统由PluginManager、MessageManager、Interface插件接口和插件库四部分组成

**PluginManager：**&#x7531;加载类、注册类、查找类、运行类、插件线程五个类实现。

**MessageManager：**&#x7531;监听线程实现。

**Interface插件接口：**&#x4F9D;赖模块实现的接口留给插件编写者调用，更方便的完成插件编写。

**插件库：**&#x63D2;件开发者实现的一系列插件，该系统有一定的插件编写规范。
{% endhint %}

## **PART 1:PluginManager插件管理器**

<mark style="color:blue;">**目的：实现多个插件的同时运行，插件的加载，插件的注册，插件信息的展示**</mark>

* 根据用户选择，加载对应插件
* 列出对应插件参数
* 多线程同时运行多个插件
* 展示插件运行结果
* 消息队列处理多个插件返回结果
* 自行导入插件，同时安装插件依赖python库

{% tabs %}
{% tab title="加载类" %}
**加载类功能：**

check函数用于遍历plugins文件夹中所有插件，判断是否所有插件都在Plugins.json中注册，如果未注册就交给注册类注册

```python
# 加载类
class Load:
    def __init__(self,Register) -> None:
        self.Register=Register
        pass
        
    # 检测插件
    def check(self):
        # 读取 Plugins.json 文件
        with open("myproject\PluginSystem\Plugins.json", 'r') as f:
            registered_plugins = [i["name"] for i in json.load(f)['PluginList']]
            # print(registered_plugins)

        # 扫描子文件夹 Plugins 下的所有文件夹，找到其中的插件
        plugins = []
        for folder_name in os.listdir('myproject\PluginSystem\Plugins'):
    
            folder_path = os.path.join('myproject\PluginSystem\Plugins', folder_name)
            if os.path.isdir(folder_path) and os.path.isfile(os.path.join(folder_path, 'info.json')):
                plugins.append(folder_name)
        # print(plugins)

        # 检查每个插件是否都已经在 Plugins.json 中注册,为注册则注册该插件
        for plugin in plugins:
            if plugin not in registered_plugins:
                # print(f'Error: Plugin {plugin} is not registered in Plugins.json')
                self.Register.registe(plugin)
        print("所有插件加载完成")
```

{% endtab %}

{% tab title="注册类" %}
**注册类功能：**

`registe(self, plugin_name)：`\
函数接收插件名PluginName将对应的插件注册到Plugins.json&#x4E2D;**\_\_add\_PluginInfo(self,args={})**：添加plugin信息\
`__find_info(self,PluginName)：`\
找到plugin的信息，返回一个字典，字典中包含了插件的信息，例如名字，版本，状态等等。\
`__pip_install_requirements(self,file_path)：`\
安装依赖库\
`is_in_pluginlist(self,plugin_name,plugin_version)：`\
判断是否注册过该插件

```python
# 注册类
class Register:
    # 添加plugin信息
    def __add_PluginInfo(self,args={}):
        dic={}
        try:
            with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
                dic = json.load(f)
                dic["PluginList"].append(args)
                # print(json.dumps(dic, indent=4, ensure_ascii=False))
            with open('myproject\PluginSystem\Plugins.json', 'w', encoding='utf-8') as f:
                json.dump(dic, f, ensure_ascii=False, indent=4)
        except Exception as e:
            print(f"添加plugin信息错误,模块位置:{__file__},错误位置:PluginManager.Register.__add_PluginInfo(self,args)\n\t"+"错误信息："+e)

    # 找到plugin的信息，返回一个字典，字典中包含了插件的信息，例如名字，版本，状态等等。
    def __find_info(self,PluginName):
        dic={}
        with open(f'myproject\PluginSystem\Plugins\{PluginName}\info.json', 'r', encoding='utf-8') as f:
            dic = json.load(f)
            # print(dic)
        return dic

    # 注册plugin，将插件添加到pluginlist中。
    def registe(self, plugin_name):
        info=self.__find_info(plugin_name)
        version=info["version"]
        if(self.is_in_pluginlist(plugin_name,version)):
            print(f"已经注册过插件{plugin_name}")
            # 抛出message
            return
        print(f"{plugin_name}注册中...")
        self.__add_PluginInfo(info)
        self.__pip_install_requirements(f'myproject\PluginSystem\Plugins\{plugin_name}\\requirements.txt')

    # 安装依赖库
    def __pip_install_requirements(self,file_path):
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", file_path, "-i", SETTING.PIP_SOURCE])


    # 判断是否注册过该插件
    def is_in_pluginlist(self,plugin_name,plugin_version):
        dic={}
        with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
            dic = json.load(f)
            # print(dic)
        for i in dic["PluginList"]:
            if i["name"]==plugin_name:
                if i["version"]==plugin_version: 
                    return True
        return False
```

{% endtab %}

{% tab title="查找类" %}
`List_all_info(self)：`列出所有的plugin

`List_all_name_version(self)：`列出所有plugin的插件名和版本号

`List_index_info(self,namelist)：`列出指定namelist的plugin的info,namelist是一个列表,返回一个info列表

```python
# 查找类,用于列出插件,及其相关信息,在使用时用p.List.List_index_info(["PluginB","asd","sad"])
class List:
    def __init__(self) -> None:
        pass
    # 列出所有的plugin
    def List_all_info(self):
        dic={}
        with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
            dic = json.load(f)
            # print(dic)
        return dic
    
    # 列出所有plugin的插件名和版本号
    def List_all_name_version(self):
        dic={}
        dic_name_version=[]
        with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
            dic = json.load(f)
            for i in dic["PluginList"]:
                item={}
                item["name"]=i["name"]
                item["version"]=i["version"]
                dic_name_version.append(item)
            # print(dic_name_version)
        return dic_name_version
    
    # 列出指定namelist的plugin的info,namelist是一个列表,返回一个info列表
    def List_index_info(self,namelist):
        Info_List=[]
        # print(List)
        with open('myproject\PluginSystem\Plugins.json', 'r', encoding='utf-8') as f:
            dic = json.load(f)
            for d in dic["PluginList"]:
                if d["name"]  in namelist:
                    Info_List.append(d)
        # print(Info_List)
        return Info_List
```

{% endtab %}

{% tab title="运行类" %}
`run_plugins(self,ArgsList=None)：`解析所有表单信息，根据指定参数运行对应插件

`__Generator_PluginObj(self,name=None)：`根据表单解析后的字典生成一个插件类（私有函数）

`show_result(self)：`展示运行结果调用插件的show\_result()

```python
# 运行类
class Run:   
    def __init__(self,project_name):
        self.plugin_threadings=[]
        self.project_name=project_name
        self.plugin_obj=[]
    # 解析所有表单信息，根据指定参数运行对应插件
    def run_plugins(self,ArgsList=None):
        if not ArgsList:
            return
        for i in ArgsList:
            name=i["name"]
            # 添加项目名称
            i["project_name"]=self.project_name
            # 创建插件对象
            obj=self.__Generator_PluginObj(name)
            if obj is None:
                continue
            threading=PluginThread(obj, i)
            self.plugin_threadings.append(threading)
            threading.start()
        # 创建监听监听每一个插件返回的消息
        # print("run_plugins:",[self.plugin_threadings])
        Build_Listener(ThreadList=[self.plugin_threadings],pluginlist=ArgsList,project_name=self.project_name)
        print("[+]所有插件运行结束")

    # 根据表单解析后的字典生成一个插件类（私有函数）
    def __Generator_PluginObj(self,name=None):
        if not name:
            return
        # 解析出插件名
        try:
            plugin_module = importlib.import_module(f"Plugins.{name}.{name}")
            # print(dir(plugin_module))
            plugin_class = getattr(plugin_module,name)
            plugin_obj=plugin_class()
            # 加入对象列表
            self.plugin_obj.append(plugin_obj)
            return plugin_obj
        except:
            print(f"[ERROR]插件{name}创建错误，原因可能是{name}不存在,请检测是否存在Plugins/{name}.py下是否有以{name}命名的class,详情请查插件编写规范")
            return None
    
    def show_result(self):
        for i in self.plugin_obj:
            try:
                i.show_result()
            except:
                pass

```

{% endtab %}

{% tab title="插件线程" %}
根据接收的插件对象，和运行参数创建一个线程运行该插件

```python
class PluginThread(threading.Thread):
    def __init__(self, plugin, args):
        threading.Thread.__init__(self)
        self.plugin = plugin
        self.args=args

    def run(self):
        try:
            self.plugin.run(self.args)
        except:
            print(f"插件{self.plugin.name}运行出错")
            return
```

{% endtab %}
{% endtabs %}

## **PART 2:MessageManager插件消息管理器**

<mark style="color:blue;">**目的：实现一个监听线程处理插件返回的消息**</mark>

* 处理插件返回的消息
* 处理插件返回的日志
* 储存插件返回的结果

{% tabs %}
{% tab title="监听器" %}
**监听多线程消息的实现原理：**&#x63D2;件接口会接收来自各个插件返回的消息交给一个<mark style="color:blue;">队列</mark>，开辟的<mark style="color:blue;">监听线程</mark>会实时监听该队列，并且解析从队列中读取的消息，分类交给Recorder库将消息存储起来

```python
import threading
import queue
from time import sleep
import os
import sys
# 获取上一级目录路径
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from Recorder.Recorder import Recorder

TEST="TEST"

# 结束事件
all_plugins_end=threading.Event()
# 消息队列
message_queue=queue.Queue()

# message格式如下：
#     a={
#         "name":"PluginName",
#         "type":"log",
#         "level":"info",
#         "messages":[]
#     }
# 处理消息
def process_message(message):
    # print(message)
    if message.get("type","") == "log":
        message_recorder.log(message)
    elif message.get("type","") == "result":
        message_recorder.result(message)
    else:
        print("[ERROR]未知消息类型：",message)
        return

# 监听器——实现监听
def message_listener():
    print("[+]监听创建成功")
    while True:
        # print(all_plugins_end.is_set(),message_queue.empty())
        # 判断是否所有插件线程都结束了
        if all_plugins_end.is_set() and message_queue.empty():
                break
        try:
            message = message_queue.get(timeout=1)
            # 处理消息
            if message is None:
                continue
            process_message(message)
            # 处理完成
            message_queue.task_done()

        except queue.Empty:
            sleep(2)
            pass

# 创建监听接收一个线程列表
def Build_Listener(ThreadList,pluginlist,project_name):
    # print(ThreadList)
    global message_recorder
    message_recorder=Recorder(project_name)
    message_recorder.creat_dir(pluginlist)
    try:
        # 创建并启动消息监听器线程
        listener_thread = threading.Thread(target=message_listener)
        listener_thread.daemon = True  # 设置为守护线程，主线程结束时自动退出
        listener_thread.start()
    except Exception as e:
        print("[ERROR]监听器创建失败：\n\t",e)

    try:
        # 等待所有线程执行完成
        for i in ThreadList[0]:
            i.join()
        # 删除所有线程
        for i in ThreadList[0]:
            ThreadList[0].remove(i)
        # 设置事件，表示所有插件线程都结束了
        all_plugins_end.set()
    except Exception as e:
            print("[ERROR]监听线程错误：\n\t",e)
    try:
        # 等待消息队列中的消息都被处理完
        message_queue.join()
        print("[+]消息处理完成")
        # 等待消息监听器线程结束
        # listener_thread.join()
        print("[+]所有线程运行完成，监听结束")
    except Exception as e:
        print("[ERROR]消息队列监听错误：\n\t",e)

    return 

# 插件接口函数，插件调用返回message
def Put_message(message):
    # print(message)
    message_queue.put(message)
```

{% endtab %}
{% endtabs %}

## **PART 3:plugins.Interface插件接口**

* `Interface.Trans_message(message)`

向插件系统传递消息，插件系统会根据message处理消息

```python
# 传递消息
def Trans_message(message):
    mg.Put_message(message)
```

* `Get_HTMLExtractor(HTML="")`

获取一个HTML解析器，HTML提取器将会解析html的input输入框，form表单，textarea多行输入框，接口使用实例

```python
# 获取一个html解析类对象
def Get_HTMLExtractor(HTML=""):
    Extractor = EH.HTMLExtractor(HTML)
    return Extractor
```

* `Get_SelfDefExtractor(setting={})`

获得一个自定义提取器，提取指定的html元素

```python
# 获取一个自定义提取类对象
def Get_SelfDefExtractor(setting):
    Extractor = ES.Self_Defining_Extractor(setting)
    return Extractor
```

* `request_form(args)`

针对使用`HTMLExtractor.extract_form()`提取器提取的表单结果，发送请求的一个接口，返回一个响应

接收参数详解：

```
"""
request_form(args)参数：
    args={
        "form":{'form_name': 'form1', 'form_method': 'post', 'form_action': ' ', 'input_datas': [{'name': 'uname', 'type': 'text', 'id': None}, {'name': 'passwd', 'type': 'text', 'id': None}, {'name': 'submit', 'type': 'submit', 'id': None}]},
        "payload":"",
        "url":"",
        "cookies":{},
        "proxies":{},
        "random_ua":True,
    }
"""
```

具体实现：

```python
# 针对使用form提取器提取结果进行请求，返回一个请求
def request_form(args):
    # 设置UA头
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
    }
    # print("request_form:",args)
    form = args.get("form", None)
    # 如果没有参数则返回
    if form == None:
        print("[ERROR]request_get:form is None")
        return None
    # 判断是否随机ua
    if args.get("random_ua", True):
        ua = UserAgent()
        headers["User-Agent"] = ua.random
    # print("注入form表单")
    params = {}
    method = form.get("form_method", None)
    # 解析form构造payload
    def get_payload():
        payload=args.get("payload", None)
        # print(payload)
        # url编码
        # payload= urllib.parse.urlencode(payload)
        # print(payload)
        name = form.get("name", "id")
        for i in form.get("input_datas", []):
            if  i.get("name").lower() == "submit" :
                # 注意后续变动
                params["submit"] = "Submit"
                continue
            name=i.get("name", "id")
            params[name] = payload
        return
    get_payload()
    # print("request_form:",params)

    # 发送get请求
    if method == "get":
        # print("注入form表单,发送get请求")
        def Get_method():
            # print(params)
            # print("\n发送post请求",form.get("cookies", None))
            try:
                req = requests.get(
                    allow_redirects=False,
                    url=args.get("url", "127.0.0.1"),
                    params=params,
                    headers=headers,
                    cookies=form.get("cookies", None),
                    proxies=form.get("proxies", None),
                    timeout=5,
                )
                return req
            except Exception as e:
                pass
            
        return Get_method()
    # 发送post请求
    elif method == "post":
        # print("注入form表单,发送Post请求")
        def Post_method():
            # print(params)
            # print("\n发送post请求",form.get("cookies", None))
            try:
                # 修改
                req = requests.post(
                    allow_redirects=False,
                    url=args.get("url", "127.0.0.1"),
                    data=params,
                    headers=headers,
                    cookies=form.get("cookies", None),
                    proxies=form.get("proxies", None),
                    timeout=5,
                )
                # print(req.status_code)
                return req
            except Exception as e:
                pass
            
        return Post_method()
    else:
        # print("[Error]注入form表单失败method错误")
        return
```

* `request_input(args)`

针对使用`HTMLExtractor.extract_input()`提取器提取的input输入框结果，发送请求的一个接口，返回一个响应

```python
"""
request_input(args)参数：
    args={
        "input":{'name': 'passwd', 'type': 'text', 'value': '', 'form': 'form1', 'method': 'post'},
        "payload":"",
        "url":"",
        "cookies":{},
        "proxies":{},
        "random_ua":True,
    }
"""
# 针对使用input提取器提取结果进行请求，返回一个请求
def request_input(args):
    # 设置UA头
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
    }
    input = args.get("input", None)
    # print("input:",input)
    # 如果没有参数则返回
    if input == None:
        print("[ERROR]request_get:input is None")
        return None
    # 如果input属于一个表单则不进行请求
    if input.get("form", None) is not None:
        # print("input输入框属于一个表单不进行注入")
        return None
    # 设置随机ua头
    if args.get("random_ua", True):
        ua = UserAgent()
        # print(ua.random)
        headers["User-Agent"] = ua.random

    # 获取input输入框的变量名name和请求方法method
    name = input.get("name", "id")
    method = input.get("method", None)
    # 发送get请求
    if method == "get":
        def Get_method():
            params = {}
            params[name] = args.get("payload", None)
            # print(params)
            req = requests.get(
                url=args.get("url", "127.0.0.1"),
                params=params,
                headers=headers,
                cookies=args.get("cookies", None),
                proxies=args.get("proxies", None),
                timeout=(10, None),
                allow_redirects=False,
            )
            # print(req)
            return req
        return Get_method()
    # 发起post请求
    elif method == "post":
        def Post_method():
            params = {}
            params[name] = args.get("payload", None)
            # print(params)
            req = requests.post(
                allow_redirects=False,
                url=args.get("url", "127.0.0.1"),
                data=params,
                headers=headers,
                cookies=input.get("cookies", None),
                proxies=input.get("proxies", None),
                timeout=5,
            )
            # print(req.request)
            # print(req)
            return req
        return Post_method()
    else:
        return

```

* `Get_Storage()`

获得一个Storage对象用于向对数据库的操作：1.创建表单 2.向指定表单插入数据

```python
# 获取一个存储类对象
def Get_Storage():
    StoreObj = storage.Storage_Base()
    return StoreObj
```

* `GET_Recorder(project_name)`

<pre><code><strong># 获取一个记录器对象
</strong>def GET_Recorder(project_name):
    if project_name == None:
        print("[ERROR]GET_Recorder:project_name is None")
        return None
    RecorderObj = Recorder(project_name)
    return RecorderObj
</code></pre>

* `Get_file(plugin,filename,type)`

获得该插件的工作路径下的文件操作

```python
# 获得插件工作地址的文件
def Get_file(plugin,filename,type):
    file_dir=os.path.join(os.path.join(os.path.join(os.path.join(grandfather_dir,"History"),plugin.project_name),plugin.plugin_name),filename)
    file = open(file_dir, type)
    return file
```

* `Get_a_file(file_path,type)`

获得一个文件：

```python
# 获得一个文件
def Get_a_file(file_path,type):
    try:
        file = open(file_path, type)
        return file
    except Exception as e:
        print(e)
        pass
```

* `Get_url_from_scrapy(project_name)`

从scrapy扫描结果载入url文件

```python
def Get_url_from_scrapy(project_name):
    file_dir=os.path.join(os.path.join(os.path.join(grandfather_dir,"History"),project_name),"urls.txt")
    file = open(file_dir, "r")
    return file
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://doc.why403.cn/gai-shu/cha-jian-xi-tong-shi-xian.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
