🤖依赖模块实现

依赖模块由爬虫模块、提取模块、存储模块三部分组成。

爬虫模块由spider、pipelines、middlewares、settings四部分组成,依赖scrapy框架。

提取模块由HTMLExtractor、Self_Defining_Extractor、URLExtractor三部分组成。

存储模块由Recorder、Store两部分组成

爬虫模块

这个部分是爬取页面的核心部分:

  • 爬取方式:广度优先

  • 爬取原理:通过URLExtractor提取出页面中的所有url,并用这些url继续发送请求,请求会被发送给scrapy内核中的一个工作队列,接收到响应后,将响应回调给parse函数以相同的方式继续对页面进行探索。

  • 将提取的url返回给pipelines

import scrapy
from myproject.items import URLItem
from store import storage
from Extractor import URLExtractor
from Recorder.Recorder import Recorder
import logging
import re
# from fake_useragent import UserAgent
import myproject.settings as mysetting


logtemp={
    "name":"scrapy",
    "type":"log",
    "level":"info",
    "messages":[]
}

class TestSpider(scrapy.Spider):
    name = "test"
    allowed_domains = []
    start_urls = []
    cookies={}

    def __init__(self,project_name="",cookies=None, allowed_domains=None, start_urls=None, *args, **kwargs):
        super(TestSpider, self).__init__(*args, **kwargs)
        logging.info("初始化spider")
        self.project_name=project_name
        self.cookies=cookies
        self.allowed_domains=allowed_domains
        self.start_urls=start_urls

        # 创建记录类
        self.recorder=Recorder(project_name)
        # 创建存储类
        self.storage=storage.Storage_Base()
        # 创建URL提取器
        self.url_extractor=URLExtractor.URLProcessor(allow_domains=self.allowed_domains,start_urls=self.start_urls)
            
    # 爬虫开始运行前
    def start_requests(self):
        # 在爬虫启动时执行的操作,可以在这里发送初始请求
        print('爬虫运行中...')
        log=logtemp
        log["messages"]=[f'爬虫开始运行:\n\t{self.cookies}\n\t{self.start_urls}\n\t{self.allowed_domains}\n']
        print(log["messages"])
        self.recorder.log(log)
        # 请求初始URL
        for start_url in self.start_urls:
            log["messages"]=[f'请求初始url:{start_url}']
            self.recorder.log(log)
            print(log["messages"])
            yield scrapy.Request(url=start_url, cookies=self.cookies,callback=self.parse,)

    def parse(self, response):
        # 提取并去重url
        try:
            # print("提取url")
            urls=self.url_extractor.get_urls(response) # 格式化后的URL
            De_duplication_urls=self.url_extractor.De_duplication_url(urls) # 去重后的URL
            
            if De_duplication_urls:
                for url in De_duplication_urls:
                    print(url)
                    yield response.follow(url, cookies=self.cookies,callback=self.parse)    # 通过此url继续发出请求

                item=URLItem()
                item["urls"]=De_duplication_urls
                yield item  # 移交给pipline转储url数据

        except Exception as e:
            logging.error('Extract_url:',e)

提取模块

提取模块用于解析页面,提取出页面中的目标信息。

提取模块的实现:

提取模块中包含URL提取类、input标签提取类、多行输入框提取类、自定义提取类。

URL提取器功能:

提取页面中的所有链接,并且实现格式化以及去重处理。

1.URL提取类的初始函数

初始化函数中的

from scrapy.linkextractors import LinkExtractor
from urllib.parse import urlparse, urljoin, urlunparse
class URLProcessor:
    #初始化函数
    def __init__(self, allow_domains,start_url):
        self.allow_domains = allow_domains
        self.start_url = start_url
        # 解析出所用的协议
        self.protocol = start_url.split("://")[0]+"://"
        # 一个url集合用于url去重
        self.urls_set=set()

2.提取页面中所有链接

# 获取所有链接
def get_urls(self, response):
    extractor = LinkExtractor(allow_domains=self.allow_domains)
    links = extractor.extract_links(response)
    urls = [link.url for link in links]
    formated_urls=[]
    for url in urls:
        # 进行url格式化
        url=self.normalize_url(url,self.start_url)
        formated_urls.append(url)
    return formated_urls

3.格式化url

# 格式化url
def normalize_url(url, base_url):
    parsed_url = urlparse(url)
    
    if not parsed_url.netloc:  # 如果URL没有主域名
        base_parsed_url = urlparse(base_url)
        url = urljoin(base_url, url)  # 将相对链接转换为绝对链接
        
        parsed_url = urlparse(url)  # 重新解析绝对链接
        
        # 如果绝对链接还没有主域名,则将主域名添加到URL中
        if not parsed_url.netloc:
            url = urlunparse((base_parsed_url.scheme, base_parsed_url.netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
    
    # 对URL进行标准化处理,去除末尾斜杠,将scheme和主机名转为小写等
    normalized_url = urlunparse((parsed_url.scheme.lower(), parsed_url.netloc.lower(), parsed_url.path.rstrip('/'), parsed_url.params, parsed_url.query, parsed_url.fragment))
    
    return normalized_url

4.URL去重

# URL去重
def De_duplication_url(self,urls):
    Prossesed_urls=[]
    for url in urls:
        if url not in self.urls_set:
            self.urls_set.add(url)
            Prossesed_urls.append(url)
    return Prossesed_urls

//待完善功能

5.判断是否是静态网页

要判断提取的URL是否为静态网页,你可以使用Python的requests库来发送HEAD请求,并检查响的Content-Type头部字段。如果Content-Type指示了静态文件类型(例如"text/html"、“text/plain”、"text/css"等),则可以认为该链接是静态网页。以下是相应的代码实现:

// Some code

以上代码会发送HEAD请求获取URL的响应头部信息,然后检查Content-Type字段中是否包含静态网页类型。如果是静态网页,函数会返回True,否则返回False。

存储模块

Recorder功能

  • 创建工作文件夹

  • 根据插件接口返回的log记录各插件日志

  • 根据插件接口返回的result记录插件运行结果

import os
import sys
import logging
# 获取历史记录文件夹
history_dir=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),'History')
# print(history_dir)

# 记录类
class Recorder:
    def __init__(self,project_name) -> None:
        self.project_name=project_name
        project_dir=os.path.join(history_dir,project_name)
        self.project_dir=project_dir
        self.pluginDir_dic={}
        self.pluginlist=[]

    # 这里的pluginlist就是插件信息列表
    '''
    [
        {
            "name": "Nmap",
            "version": "0.7.1",
            "statu": "Nmap",
            "require_options": {
                "ip": "xxxxxxxxxxxxxxxx"
            }
        }
    ]'''
    
    # 创建项目文件夹
    def creat_dir(self,pluginlist):
        # 创建项目文件夹
        try:
            os.mkdir(self.project_dir)
            print(f"[+]创建项目{self.project_dir}文件夹成功")
        except Exception as e:
            print(f"[ERROR]新建项目{self.project_name}文件夹Exception:\n\t",e)

        # 创建插件记录文件夹
        for i in pluginlist:
            try:
                self.pluginlist.append(i['name']) # 只存储插件名:["A"]
                plugin_dir=os.path.join(self.project_dir,i["name"])
                self.pluginDir_dic[i['name']]=plugin_dir # 存储对应插件的地址:{"A":"A_dir"}
                os.mkdir(plugin_dir)
                print(f"[+]创建项目插件文件夹{plugin_dir}成功")
            except Exception as e:
                print(f"[ERROR]新建项目{self.project_name}插件文件夹Exception:\n\t",e)
        # print(self.pluginDir_dic)

    def creat_a_dir(self,plugin_name):
        newdir=os.path.join(os.path.join(history_dir,self.project_name),plugin_name)
        print(newdir)
        try:
            os.mkdir(newdir)
        except Exception as e:
            print(f"[ERROR]新建项目{self.project_name}插件文件夹Exception:\n\t",e)
        self.pluginlist.append(plugin_name)
        self.pluginDir_dic[plugin_name]=newdir

    """
    将消息传给插件管理器的接口,接收一个字典
    格式如下:
        a={
            "name":"PluginName",
            "type":"log"
            "level":"info"
            "messages":[
                "111111",
                "222222"
            ]
        }
    """
    def log(self, message):
        print("日志-记录~o.0")
        print(message)
        plugin_name=message["name"]
        # 如果plugin_name不在pluginlist中就新建文件夹,并将该plugin_name添加到pluginlist,把新建的地址添加到pluginDir_dic
        if(plugin_name not in self.pluginlist):
            self.creat_a_dir(plugin_name)
        logpath = self.pluginDir_dic[plugin_name]
        log_level = message.get("level", "info")  # 默认级别为info
        log_file = os.path.join(logpath,"log.txt")

        # 创建一个日志记录器
        logger = logging.getLogger('my_logger')
        logger.setLevel(logging.DEBUG)
        # 创建一个文件处理器并指定日志文件路径
        file_handler = logging.FileHandler(log_file)
        # 设置文件处理器的日志级别
        file_handler.setLevel(logging.DEBUG)
        # 创建一个格式化器,定义日志消息的格式
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        # 将格式化器应用到文件处理器
        file_handler.setFormatter(formatter)
        # 将文件处理器添加到日志记录器
        logger.addHandler(file_handler)
        # print(log_file)
        
        # 根据级别添加日志
        if log_level == "info":
            # print(message["messages"])
            for m in message["messages"]:
                print(m)
                logger.info(m)
        elif log_level == "warning":
            for m in message["messages"]:
                logger.warning(m)
        elif log_level == "error":
            for m in message["messages"]:
                logger.error(m)
        elif log_level == "debug":
            for m in message["messages"]:
                logger.debug(m)
        else:
            print(f"未正确设定message['level']\n\tlevel分为debug,info,warning,error,critical")
            return

    """
    将消息传给插件管理器的接口,接收一个字典
    格式如下:
        a={
            "name":"PluginName",
            "type":"log",
            "level":"info",
            "messages":[
                "111111",
                "222222"
            ]
        }
    """
    # 处理结果
    def result(self,message):
        plugin_name=message["name"]
        if(plugin_name not in self.pluginlist):
            self.creat_a_dir(plugin_name)
            print(f"[ERROR]插件{plugin_name}不存在,已根据插件{plugin_name}创建新的插件目录")
            
        resultpath = self.pluginDir_dic.get(plugin_name)
        result_file = resultpath + "/result.txt"
        try:
            with open(result_file, "a") as f:
                for m in message["messages"]:
                    f.write(str(m)+"\n")
            print(f"[+]插件{plugin_name}记录成功")
        except Exception as e:
            logging.critical("[ERROR]插件{}记录失败\n\t{}".format(plugin_name,e))
        return

最后更新于