🤖依赖模块实现
爬虫模块
这个部分是爬取页面的核心部分:
爬取方式:广度优先
爬取原理:通过URLExtractor提取出页面中的所有url,并用这些url继续发送请求,请求会被发送给scrapy内核中的一个工作队列,接收到响应后,将响应回调给parse函数以相同的方式继续对页面进行探索。
将提取的url返回给pipelines
import scrapy
from myproject.items import URLItem
from store import storage
from Extractor import URLExtractor
from Recorder.Recorder import Recorder
import logging
import re
# from fake_useragent import UserAgent
import myproject.settings as mysetting
logtemp={
"name":"scrapy",
"type":"log",
"level":"info",
"messages":[]
}
class TestSpider(scrapy.Spider):
name = "test"
allowed_domains = []
start_urls = []
cookies={}
def __init__(self,project_name="",cookies=None, allowed_domains=None, start_urls=None, *args, **kwargs):
super(TestSpider, self).__init__(*args, **kwargs)
logging.info("初始化spider")
self.project_name=project_name
self.cookies=cookies
self.allowed_domains=allowed_domains
self.start_urls=start_urls
# 创建记录类
self.recorder=Recorder(project_name)
# 创建存储类
self.storage=storage.Storage_Base()
# 创建URL提取器
self.url_extractor=URLExtractor.URLProcessor(allow_domains=self.allowed_domains,start_urls=self.start_urls)
# 爬虫开始运行前
def start_requests(self):
# 在爬虫启动时执行的操作,可以在这里发送初始请求
print('爬虫运行中...')
log=logtemp
log["messages"]=[f'爬虫开始运行:\n\t{self.cookies}\n\t{self.start_urls}\n\t{self.allowed_domains}\n']
print(log["messages"])
self.recorder.log(log)
# 请求初始URL
for start_url in self.start_urls:
log["messages"]=[f'请求初始url:{start_url}']
self.recorder.log(log)
print(log["messages"])
yield scrapy.Request(url=start_url, cookies=self.cookies,callback=self.parse,)
def parse(self, response):
# 提取并去重url
try:
# print("提取url")
urls=self.url_extractor.get_urls(response) # 格式化后的URL
De_duplication_urls=self.url_extractor.De_duplication_url(urls) # 去重后的URL
if De_duplication_urls:
for url in De_duplication_urls:
print(url)
yield response.follow(url, cookies=self.cookies,callback=self.parse) # 通过此url继续发出请求
item=URLItem()
item["urls"]=De_duplication_urls
yield item # 移交给pipline转储url数据
except Exception as e:
logging.error('Extract_url:',e)
提取模块
提取模块用于解析页面,提取出页面中的目标信息。
提取模块的实现:
提取模块中包含URL提取类、input标签提取类、多行输入框提取类、自定义提取类。
URL提取器功能:
提取页面中的所有链接,并且实现格式化以及去重处理。
1.URL提取类的初始函数
初始化函数中的
from scrapy.linkextractors import LinkExtractor
from urllib.parse import urlparse, urljoin, urlunparse
class URLProcessor:
#初始化函数
def __init__(self, allow_domains,start_url):
self.allow_domains = allow_domains
self.start_url = start_url
# 解析出所用的协议
self.protocol = start_url.split("://")[0]+"://"
# 一个url集合用于url去重
self.urls_set=set()
2.提取页面中所有链接
# 获取所有链接
def get_urls(self, response):
extractor = LinkExtractor(allow_domains=self.allow_domains)
links = extractor.extract_links(response)
urls = [link.url for link in links]
formated_urls=[]
for url in urls:
# 进行url格式化
url=self.normalize_url(url,self.start_url)
formated_urls.append(url)
return formated_urls
3.格式化url
# 格式化url
def normalize_url(url, base_url):
parsed_url = urlparse(url)
if not parsed_url.netloc: # 如果URL没有主域名
base_parsed_url = urlparse(base_url)
url = urljoin(base_url, url) # 将相对链接转换为绝对链接
parsed_url = urlparse(url) # 重新解析绝对链接
# 如果绝对链接还没有主域名,则将主域名添加到URL中
if not parsed_url.netloc:
url = urlunparse((base_parsed_url.scheme, base_parsed_url.netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
# 对URL进行标准化处理,去除末尾斜杠,将scheme和主机名转为小写等
normalized_url = urlunparse((parsed_url.scheme.lower(), parsed_url.netloc.lower(), parsed_url.path.rstrip('/'), parsed_url.params, parsed_url.query, parsed_url.fragment))
return normalized_url
4.URL去重
# URL去重
def De_duplication_url(self,urls):
Prossesed_urls=[]
for url in urls:
if url not in self.urls_set:
self.urls_set.add(url)
Prossesed_urls.append(url)
return Prossesed_urls
//待完善功能
5.判断是否是静态网页
要判断提取的URL是否为静态网页,你可以使用Python的requests库来发送HEAD请求,并检查响的Content-Type头部字段。如果Content-Type指示了静态文件类型(例如"text/html"、“text/plain”、"text/css"等),则可以认为该链接是静态网页。以下是相应的代码实现:
// Some code
以上代码会发送HEAD请求获取URL的响应头部信息,然后检查Content-Type字段中是否包含静态网页类型。如果是静态网页,函数会返回True,否则返回False。
存储模块
Recorder功能
创建工作文件夹
根据插件接口返回的log记录各插件日志
根据插件接口返回的result记录插件运行结果
import os
import sys
import logging
# 获取历史记录文件夹
history_dir=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),'History')
# print(history_dir)
# 记录类
class Recorder:
def __init__(self,project_name) -> None:
self.project_name=project_name
project_dir=os.path.join(history_dir,project_name)
self.project_dir=project_dir
self.pluginDir_dic={}
self.pluginlist=[]
# 这里的pluginlist就是插件信息列表
'''
[
{
"name": "Nmap",
"version": "0.7.1",
"statu": "Nmap",
"require_options": {
"ip": "xxxxxxxxxxxxxxxx"
}
}
]'''
# 创建项目文件夹
def creat_dir(self,pluginlist):
# 创建项目文件夹
try:
os.mkdir(self.project_dir)
print(f"[+]创建项目{self.project_dir}文件夹成功")
except Exception as e:
print(f"[ERROR]新建项目{self.project_name}文件夹Exception:\n\t",e)
# 创建插件记录文件夹
for i in pluginlist:
try:
self.pluginlist.append(i['name']) # 只存储插件名:["A"]
plugin_dir=os.path.join(self.project_dir,i["name"])
self.pluginDir_dic[i['name']]=plugin_dir # 存储对应插件的地址:{"A":"A_dir"}
os.mkdir(plugin_dir)
print(f"[+]创建项目插件文件夹{plugin_dir}成功")
except Exception as e:
print(f"[ERROR]新建项目{self.project_name}插件文件夹Exception:\n\t",e)
# print(self.pluginDir_dic)
def creat_a_dir(self,plugin_name):
newdir=os.path.join(os.path.join(history_dir,self.project_name),plugin_name)
print(newdir)
try:
os.mkdir(newdir)
except Exception as e:
print(f"[ERROR]新建项目{self.project_name}插件文件夹Exception:\n\t",e)
self.pluginlist.append(plugin_name)
self.pluginDir_dic[plugin_name]=newdir
"""
将消息传给插件管理器的接口,接收一个字典
格式如下:
a={
"name":"PluginName",
"type":"log"
"level":"info"
"messages":[
"111111",
"222222"
]
}
"""
def log(self, message):
print("日志-记录~o.0")
print(message)
plugin_name=message["name"]
# 如果plugin_name不在pluginlist中就新建文件夹,并将该plugin_name添加到pluginlist,把新建的地址添加到pluginDir_dic
if(plugin_name not in self.pluginlist):
self.creat_a_dir(plugin_name)
logpath = self.pluginDir_dic[plugin_name]
log_level = message.get("level", "info") # 默认级别为info
log_file = os.path.join(logpath,"log.txt")
# 创建一个日志记录器
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
# 创建一个文件处理器并指定日志文件路径
file_handler = logging.FileHandler(log_file)
# 设置文件处理器的日志级别
file_handler.setLevel(logging.DEBUG)
# 创建一个格式化器,定义日志消息的格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 将格式化器应用到文件处理器
file_handler.setFormatter(formatter)
# 将文件处理器添加到日志记录器
logger.addHandler(file_handler)
# print(log_file)
# 根据级别添加日志
if log_level == "info":
# print(message["messages"])
for m in message["messages"]:
print(m)
logger.info(m)
elif log_level == "warning":
for m in message["messages"]:
logger.warning(m)
elif log_level == "error":
for m in message["messages"]:
logger.error(m)
elif log_level == "debug":
for m in message["messages"]:
logger.debug(m)
else:
print(f"未正确设定message['level']\n\tlevel分为debug,info,warning,error,critical")
return
"""
将消息传给插件管理器的接口,接收一个字典
格式如下:
a={
"name":"PluginName",
"type":"log",
"level":"info",
"messages":[
"111111",
"222222"
]
}
"""
# 处理结果
def result(self,message):
plugin_name=message["name"]
if(plugin_name not in self.pluginlist):
self.creat_a_dir(plugin_name)
print(f"[ERROR]插件{plugin_name}不存在,已根据插件{plugin_name}创建新的插件目录")
resultpath = self.pluginDir_dic.get(plugin_name)
result_file = resultpath + "/result.txt"
try:
with open(result_file, "a") as f:
for m in message["messages"]:
f.write(str(m)+"\n")
print(f"[+]插件{plugin_name}记录成功")
except Exception as e:
logging.critical("[ERROR]插件{}记录失败\n\t{}".format(plugin_name,e))
return
最后更新于
这有帮助吗?