🤖依赖模块实现
依赖模块由爬虫模块、提取模块、存储模块三部分组成。
爬虫模块由spider、pipelines、middlewares、settings四部分组成,依赖scrapy框架。
提取模块由HTMLExtractor、Self_Defining_Extractor、URLExtractor三部分组成。
存储模块由Recorder、Store两部分组成
爬虫模块
这个部分是爬取页面的核心部分:
爬取方式:广度优先
爬取原理:通过URLExtractor提取出页面中的所有url,并用这些url继续发送请求,请求会被发送给scrapy内核中的一个工作队列,接收到响应后,将响应回调给parse函数以相同的方式继续对页面进行探索。
将提取的url返回给pipelines
import scrapy
from myproject.items import URLItem
from store import storage
from Extractor import URLExtractor
from Recorder.Recorder import Recorder
import logging
import re
# from fake_useragent import UserAgent
import myproject.settings as mysetting
logtemp={
"name":"scrapy",
"type":"log",
"level":"info",
"messages":[]
}
class TestSpider(scrapy.Spider):
name = "test"
allowed_domains = []
start_urls = []
cookies={}
def __init__(self,project_name="",cookies=None, allowed_domains=None, start_urls=None, *args, **kwargs):
super(TestSpider, self).__init__(*args, **kwargs)
logging.info("初始化spider")
self.project_name=project_name
self.cookies=cookies
self.allowed_domains=allowed_domains
self.start_urls=start_urls
# 创建记录类
self.recorder=Recorder(project_name)
# 创建存储类
self.storage=storage.Storage_Base()
# 创建URL提取器
self.url_extractor=URLExtractor.URLProcessor(allow_domains=self.allowed_domains,start_urls=self.start_urls)
# 爬虫开始运行前
def start_requests(self):
# 在爬虫启动时执行的操作,可以在这里发送初始请求
print('爬虫运行中...')
log=logtemp
log["messages"]=[f'爬虫开始运行:\n\t{self.cookies}\n\t{self.start_urls}\n\t{self.allowed_domains}\n']
print(log["messages"])
self.recorder.log(log)
# 请求初始URL
for start_url in self.start_urls:
log["messages"]=[f'请求初始url:{start_url}']
self.recorder.log(log)
print(log["messages"])
yield scrapy.Request(url=start_url, cookies=self.cookies,callback=self.parse,)
def parse(self, response):
# 提取并去重url
try:
# print("提取url")
urls=self.url_extractor.get_urls(response) # 格式化后的URL
De_duplication_urls=self.url_extractor.De_duplication_url(urls) # 去重后的URL
if De_duplication_urls:
for url in De_duplication_urls:
print(url)
yield response.follow(url, cookies=self.cookies,callback=self.parse) # 通过此url继续发出请求
item=URLItem()
item["urls"]=De_duplication_urls
yield item # 移交给pipline转储url数据
except Exception as e:
logging.error('Extract_url:',e)
pipelines利用store库将url存入数据库,和工作文件夹
from itemadapter import ItemAdapter
import sys
import os
# 获取上一级目录路径
save_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(save_dir)
from store import storage
logtemp={
"name":"scrapy",
"type":"log",
"level":"info",
"messages":[]
}
class URLPipeline:
def open_spider(self, spider):
print(spider.project_name)
print("打开文件")
self.store = spider.storage
self.store.Connect_mysql()
# 测试日志创建
print("日志-启动~o.0")
log=logtemp
log["messages"]=["日志-启动~o.0"]
spider.recorder.log(logtemp)
self.file = open(f"{save_dir}\\history\\{spider.project_name}\\urls.txt", "w")
# 创建数据库
try:
form_setting = {
"table_name": f"{spider.project_name}_url",
"columns": {
"url": "varchar(255)",
},
}
self.store.Creat_table(form_setting)
except Exception as e:
log=logtemp
log["messages"]=[f"Scrapy.URLPipeline{e}"]
log["level"]="warning"
spider.recorder.log(logtemp)
pass
def process_item(self, item, spider):
# print("写入文件")
if "urls" in item:
for url in item["urls"]:
# print(url)
urlitem = {
"table_name": f"{spider.project_name}_url",
"columns": {
"url": "url_str",
},
}
self.file.write(url + "\n")
urlitem["columns"]["url"]=url
self.store.insert_data(urlitem)
return item
def close_spider(self, spider):
print("[+]爬虫运行完成url成功转储\n")
num_urls=len(spider.url_extractor.urls_set)
print(f"一共爬取了{num_urls}条url,存储在{save_dir}\\history\\{spider.project_name}\\urls.txt")
self.store.close()
self.file.close()
middlewares中间件用于实现随机User_Angent
from scrapy import signals
from fake_useragent import UserAgent
# useful for handling different item types with a single interface
# 随机更换user-agent方法
class RandomUserAgentMiddlware(object):
def __init__(self, crawler):
super(RandomUserAgentMiddlware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get("RANDOM_UA_TYPE", None)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
def get_ua():
# print(request.headers)
return getattr(self.ua, self.ua_type)
if not self.ua_type == None:
# print("随机ua")
# print(self.ua_type)
request.headers.setdefault('User-Agent', get_ua())
//这部分后续会跟进添加代理
scrapy配置文件:
BOT_NAME = "myproject"
# 是否随机UA头
RANDOM_UA = True
# 随机UA头选项 "random" "chrome" "edge" "firefox" "safari"
RANDOM_UA_TYPE = "random"
SPIDER_MODULES = ["myproject.spiders"]
NEWSPIDER_MODULE = "myproject.spiders"
# UA头设置
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
# MIDDLEWARES 设置
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.RandomUserAgentMiddlware': 100,
}
# 是否遵循robots.txt rules
ROBOTSTXT_OBEY = True
RANDOM_UA_PER_REQUEST = True
# 爬取深度
DEPTH_LIMIT = 4
# 爬取延时
DOWNLOAD_DELAY = 0
ITEM_PIPELINES = {
'myproject.pipelines.URLPipeline': 100,
}
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
FEED_EXPORT_ENCODING = "utf-8"
LOG_LEVEL= 'WARNING'
提取模块
提取模块用于解析页面,提取出页面中的目标信息。
提取模块的实现:
提取模块中包含URL提取类、input标签提取类、多行输入框提取类、自定义提取类。
URL提取器功能:
提取页面中的所有链接,并且实现格式化以及去重处理。
1.URL提取类的初始函数
初始化函数中的
from scrapy.linkextractors import LinkExtractor
from urllib.parse import urlparse, urljoin, urlunparse
class URLProcessor:
#初始化函数
def __init__(self, allow_domains,start_url):
self.allow_domains = allow_domains
self.start_url = start_url
# 解析出所用的协议
self.protocol = start_url.split("://")[0]+"://"
# 一个url集合用于url去重
self.urls_set=set()
2.提取页面中所有链接
# 获取所有链接
def get_urls(self, response):
extractor = LinkExtractor(allow_domains=self.allow_domains)
links = extractor.extract_links(response)
urls = [link.url for link in links]
formated_urls=[]
for url in urls:
# 进行url格式化
url=self.normalize_url(url,self.start_url)
formated_urls.append(url)
return formated_urls
3.格式化url
# 格式化url
def normalize_url(url, base_url):
parsed_url = urlparse(url)
if not parsed_url.netloc: # 如果URL没有主域名
base_parsed_url = urlparse(base_url)
url = urljoin(base_url, url) # 将相对链接转换为绝对链接
parsed_url = urlparse(url) # 重新解析绝对链接
# 如果绝对链接还没有主域名,则将主域名添加到URL中
if not parsed_url.netloc:
url = urlunparse((base_parsed_url.scheme, base_parsed_url.netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
# 对URL进行标准化处理,去除末尾斜杠,将scheme和主机名转为小写等
normalized_url = urlunparse((parsed_url.scheme.lower(), parsed_url.netloc.lower(), parsed_url.path.rstrip('/'), parsed_url.params, parsed_url.query, parsed_url.fragment))
return normalized_url
4.URL去重
# URL去重
def De_duplication_url(self,urls):
Prossesed_urls=[]
for url in urls:
if url not in self.urls_set:
self.urls_set.add(url)
Prossesed_urls.append(url)
return Prossesed_urls
//待完善功能
5.判断是否是静态网页
要判断提取的URL是否为静态网页,你可以使用Python的requests库来发送HEAD请求,并检查响的Content-Type头部字段。如果Content-Type指示了静态文件类型(例如"text/html"、“text/plain”、"text/css"等),则可以认为该链接是静态网页。以下是相应的代码实现:
// Some code
以上代码会发送HEAD请求获取URL的响应头部信息,然后检查Content-Type字段中是否包含静态网页类型。如果是静态网页,函数会返回True,否则返回False。
HTML提取器功能:
实现指定标签、属性等的提取,以及自定义标签属性的提取。
1.HTML提取器初始函数
from bs4 import BeautifulSoup
class HTMLExtractor:
def __init__(self,html):
self.html=html
2.input提取器
# input提取器
def extract_inputs(self):
inputs = []
# 使用 Beautiful Soup 解析 HTML
soup = BeautifulSoup(self.html, 'html.parser')
# 查找所有的 input 标签
input_tags = soup.find_all('input')
# 遍历每个 input 标签
for tag in input_tags:
input_info = {}
# 提取属性信息
input_info['name'] = tag.get('name')
input_info['type'] = tag.get('type')
input_info['value'] = tag.get('value')
# 查找 input 标签所在的表单
form_tag = tag.find_parent('form')
if form_tag:
# 提取表单名和提交方式
input_info['form'] = form_tag.get('name')
input_info['method'] = form_tag.get('method')
# 将每个 input 的信息添加到列表中
inputs.append(input_info)
return inputs
3.多行输入域提取器
# 多行输入域提取器
def extract_textareas(self):
textareas = []
# 使用 Beautiful Soup 解析 HTML
soup = BeautifulSoup(self.html, 'html.parser')
# 查找所有的 textarea 标签
textarea_tags = soup.find_all('textarea')
# 遍历每个 textarea 标签
for tag in textarea_tags:
textarea_info = {}
# 提取属性信息
textarea_info['name'] = tag.get('name')
# 查找 textarea 标签所在的表单
form_tag = tag.find_parent('form')
if form_tag:
# 提取表单名和提交方式
textarea_info['form'] = form_tag.get('name')
textarea_info['method'] = form_tag.get('method')
# 将每个 textarea 的信息添加到列表中
textareas.append(textarea_info)
return textareas
4.form表单提取器
def extract_form(self):
html=self.html
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html, "html.parser")
# 查找所有的表单
forms = soup.find_all("form")
form_datas=[]
# 遍历每个表单并提取内容
for form in forms:
form_data={}
form_data['form_name']=form.get("name")
form_data['form_method']=form.get("method")
form_data['form_action']=form.get("action")
form_data['input_datas']=[]
# 获取表单的所有输入字段
inputs = form.find_all("input")
# 遍历每个输入字段并提取内容
for input_tag in inputs:
# 一条input信息
input_data={}
# 获取字段的名称和值
input_data['name'] = input_tag.get("name")
input_data['type'] = input_tag.get("type")
input_data['id'] = input_tag.get("id")
form_data['input_datas'].append(input_data)
# 将表单数据添加到列表中
form_datas.append(form_data)
return form_datas
HTML提取器使用范例:
import HTMLExtractor as HE
def html_test():
html='''
<!DOCTYPE html>
<html>
<head>
<title>程序测试</title>
</head>
<body>
<h1>程序测试</h1>
<form>
<label for="name">姓名:</label>
<input type="text" id="name" name="name" required><br><br>
<label for="email">邮箱:</label>
<input type="email" id="email" name="email" required><br><br>
<label for="message">留言:</label><br>
<textarea id="message" name="message" rows="4" cols="50" required></textarea><br><br>
<input type="submit" value="提交">
</form>
</body>
</html>
'''
html_extractor=HE.HTMLExtractor(html)
print(html_extractor.extract_form())
print(html_extractor.extract_inputs())
print(html_extractor.extract_textareas())
# 运行测试
if __name__ == '__main__':
# run_extractor_test()
html_test()
form提取结果:
结果结构解析: form_name是表单名 form_method是表单提交方式 form_action是表单在提交时执行的action input_datas是一个输入框列表
[{'form_name': None, 'form_method': None, 'form_action': None, 'input_datas': [{'name': 'name', 'type': 'text', 'id': 'name'}, {'name': 'email', 'type': 'email', 'id': 'email'}, {'name': None, 'type': 'submit', 'id': None}]}]
inputs提取结果:
结果结构解析: name是参数名 type是输入框的类型 value是参数的值 form是该输入框属于的表单 method是提交方式
[{'name': 'name', 'type': 'text', 'value': None, 'form': None, 'method': None}, {'name': 'email', 'type': 'email', 'value': None, 'form': None, 'method': None}, {'name': None, 'type': 'submit', 'value': '提交', 'form': None, 'method': None}]
textareas多行输入框提取结果:
[{'name': 'message', 'form': None, 'method': None}]
结果结构解析: name是参数名 form是属于的表单名 method是提交方式
自定义标签提取器
'''
自定义标签提取器
(参数:
tag_name:标签
attributes:待提取标签的属性[接收字典列表]
extract_attrs:待提取的属性名[列表]
extract_content:是否提取标签中的内容,
传入格式:
{'tag_name': tag_name',
'attributes': [{'attribute1':'1'},{'attribute2':'2'}],
'extract_attrs': ['extract_attr1','extract_attr2'],
'extract_content': False})
'''
class Self_Defining_Extractor:
tag_name=None
attributes=None
extract_attrs=None
extract_content=False # 默认不提取内容
def __init__(self,setting):
if setting:
self.tag_name=setting.get('tag_name')
self.attributes=setting.get('attributes')
self.extract_attrs=setting.get('extract_attrs')
self.extract_content=setting.get('extract_content')
def extract_tag_info(self, html):
tag_info = []
# 使用 Beautiful Soup 解析 HTML
soup = BeautifulSoup(html, 'html.parser')
# 查找所有指定标签名的标签
tags = soup.find_all(self.tag_name)
# print(tags) # 测试
# 遍历每个标签
for tag in tags:
# 检查标签属性是否匹配
if self.extract_attrs and all(tag.get(attr_name) == attr_value for attrs in self.attributes for attr_name, attr_value in attrs.items()):
# 提取标签的信息
tag_data = {'tag': tag.name}
if self.extract_attrs:
for attr in self.extract_attrs:
attr_name = attr
tag_data[attr_name] = tag.get(attr_name)
if self.extract_content:
tag_data['content'] = tag.get_text()
tag_info.append(tag_data)
return tag_info
自定义提取器的使用范例:
此时,你可以将该代码片段作为一个函数来使用,并按照你的参数说明来提取 HTML 中对应标签的信息。请确保将正确的 HTML 字符串传递给函数,并且提供正确的标签名称、标签属性列表和待提取的属性列表。
示例用法:
# 用于测试自定义提取器的
from bs4 import BeautifulSoup
import Self_Defining_Extractor as SDE
# 测试函数
def run_extractor_test():
# 测试HTML内容
html = '''
<!DOCTYPE html>
<html>
<head>
<title>Complex HTML Test</title>
</head>
<body>
<div class="container">
<h1>Welcome to My Website</h1>
<p>This is a paragraph.</p>
<ul class="menu">
<li><a href="#">Home</a></li>
<li><a href="#">About</a></li>
<li><a href="#">Services</a></li>
<li><a href="#">Contact</a></li>
</ul>
</div>
<div class="container">
<h2>Featured Products</h2>
<div class="product">
<img src="product1.jpg" alt="Product 1">
<h3>Product 1</h3>
<p>Description of Product 1.</p>
</div>
<div class="product">
<img src="product2.jpg" alt="Product 2">
<h3>Product 2</h3>
<p>Description of Product 2.</p>
</div>
</div>
</body>
</html>
'''
# 自定义提取器参数配置
'''
要提取的标签:'img'
标签属性:src="product2.jpg"
要提取的属性:[src,alt]
不提取标签内容
'''
setting={
'tag_name': 'img',
'attributes': [{'src':'product2.jpg'}],
'extract_attrs': ['src','alt'],
'extract_content': False
}
# 创建自定义标签提取器的实例
extractor = SDE.Self_Defining_Extractor(setting)
# 提取标签信息
tag_info = extractor.extract_tag_info(html)
print(tag_info)
# 运行测试
if __name__ == '__main__':
run_extractor_test()
```
输出结果:
[{'name': 'img', 'src': 'product2.jpg', 'alt': 'Product 2'}]
上述代码将输出带有 class="content"
和 id="my-div"
属性的 <div>
标签的信息,包括待提取的属性列表中的属性值:
{'属性1':'属性值1','属性2':'属性值2'}
如果将 extract_content
参数设置为 True
,则标签内容将会被提取。
存储模块
Recorder功能
创建工作文件夹
根据插件接口返回的log记录各插件日志
根据插件接口返回的result记录插件运行结果
import os
import sys
import logging
# 获取历史记录文件夹
history_dir=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),'History')
# print(history_dir)
# 记录类
class Recorder:
def __init__(self,project_name) -> None:
self.project_name=project_name
project_dir=os.path.join(history_dir,project_name)
self.project_dir=project_dir
self.pluginDir_dic={}
self.pluginlist=[]
# 这里的pluginlist就是插件信息列表
'''
[
{
"name": "Nmap",
"version": "0.7.1",
"statu": "Nmap",
"require_options": {
"ip": "xxxxxxxxxxxxxxxx"
}
}
]'''
# 创建项目文件夹
def creat_dir(self,pluginlist):
# 创建项目文件夹
try:
os.mkdir(self.project_dir)
print(f"[+]创建项目{self.project_dir}文件夹成功")
except Exception as e:
print(f"[ERROR]新建项目{self.project_name}文件夹Exception:\n\t",e)
# 创建插件记录文件夹
for i in pluginlist:
try:
self.pluginlist.append(i['name']) # 只存储插件名:["A"]
plugin_dir=os.path.join(self.project_dir,i["name"])
self.pluginDir_dic[i['name']]=plugin_dir # 存储对应插件的地址:{"A":"A_dir"}
os.mkdir(plugin_dir)
print(f"[+]创建项目插件文件夹{plugin_dir}成功")
except Exception as e:
print(f"[ERROR]新建项目{self.project_name}插件文件夹Exception:\n\t",e)
# print(self.pluginDir_dic)
def creat_a_dir(self,plugin_name):
newdir=os.path.join(os.path.join(history_dir,self.project_name),plugin_name)
print(newdir)
try:
os.mkdir(newdir)
except Exception as e:
print(f"[ERROR]新建项目{self.project_name}插件文件夹Exception:\n\t",e)
self.pluginlist.append(plugin_name)
self.pluginDir_dic[plugin_name]=newdir
"""
将消息传给插件管理器的接口,接收一个字典
格式如下:
a={
"name":"PluginName",
"type":"log"
"level":"info"
"messages":[
"111111",
"222222"
]
}
"""
def log(self, message):
print("日志-记录~o.0")
print(message)
plugin_name=message["name"]
# 如果plugin_name不在pluginlist中就新建文件夹,并将该plugin_name添加到pluginlist,把新建的地址添加到pluginDir_dic
if(plugin_name not in self.pluginlist):
self.creat_a_dir(plugin_name)
logpath = self.pluginDir_dic[plugin_name]
log_level = message.get("level", "info") # 默认级别为info
log_file = os.path.join(logpath,"log.txt")
# 创建一个日志记录器
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
# 创建一个文件处理器并指定日志文件路径
file_handler = logging.FileHandler(log_file)
# 设置文件处理器的日志级别
file_handler.setLevel(logging.DEBUG)
# 创建一个格式化器,定义日志消息的格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 将格式化器应用到文件处理器
file_handler.setFormatter(formatter)
# 将文件处理器添加到日志记录器
logger.addHandler(file_handler)
# print(log_file)
# 根据级别添加日志
if log_level == "info":
# print(message["messages"])
for m in message["messages"]:
print(m)
logger.info(m)
elif log_level == "warning":
for m in message["messages"]:
logger.warning(m)
elif log_level == "error":
for m in message["messages"]:
logger.error(m)
elif log_level == "debug":
for m in message["messages"]:
logger.debug(m)
else:
print(f"未正确设定message['level']\n\tlevel分为debug,info,warning,error,critical")
return
"""
将消息传给插件管理器的接口,接收一个字典
格式如下:
a={
"name":"PluginName",
"type":"log",
"level":"info",
"messages":[
"111111",
"222222"
]
}
"""
# 处理结果
def result(self,message):
plugin_name=message["name"]
if(plugin_name not in self.pluginlist):
self.creat_a_dir(plugin_name)
print(f"[ERROR]插件{plugin_name}不存在,已根据插件{plugin_name}创建新的插件目录")
resultpath = self.pluginDir_dic.get(plugin_name)
result_file = resultpath + "/result.txt"
try:
with open(result_file, "a") as f:
for m in message["messages"]:
f.write(str(m)+"\n")
print(f"[+]插件{plugin_name}记录成功")
except Exception as e:
logging.critical("[ERROR]插件{}记录失败\n\t{}".format(plugin_name,e))
return
storage类的功能:
负责实现与数据库交互进行自定义表单创建数据存储
import pymysql
from . import mysql_setting
import logging
class Storage_Base:
#配置数据库
def __init__(self):
self.mysql_host = mysql_setting.mysql_host
self.mysql_port = mysql_setting.mysql_port
self.mysql_user = mysql_setting.mysql_user
self.mysql_password = mysql_setting.mysql_password
self.mysql_db = mysql_setting.mysql_db
self.tables=[]
self.tables_name=[]
# 建立连接
def Connect_mysql(self):
self.conn = pymysql.connect(
host=self.mysql_host,
port=self.mysql_port,
user=self.mysql_user,
password=self.mysql_password,
db=self.mysql_db,
charset='utf8'
)
try:
self.cursor = self.conn.cursor()
logging.info("Connect_mysql: 连接成功 [Connection success]")
except Exception as e:
logging.error("Connect_mysql: 连接失败 [Connection failure]%s:" % e)
return False
# 关闭连接
def close(self):
self.cursor.close()
self.conn.close()
'''
根据setting创建表单
setting格式:
setting={
'table_name':'urlitem',
'columns':{
'url':'varchar(255)',
'cookie':'varchar(255)',
'tag':'varchar(255)',
'type':'varchar(255)',
'name':'varchar(255)'
}
}
创建表单
'''
def Creat_table(self, setting):
self.setting = setting
if 'table_name' in setting and setting['table_name']:
table_name = setting['table_name']
self.tables.append(setting)
self.tables_name.append(table_name)
else:
logging.error("Creat_table: table_name is null")
return False
if 'columns' in setting and setting['columns']:
columns = []
for column_name, column_value in setting['columns'].items():
columns.append(f"{column_name} {column_value}")
columns_str = ", ".join(columns)
# 注意此语句会造成sql注入,注意后续修改
query="CREATE TABLE IF NOT EXISTS {} ({})".format(table_name,columns_str)
# print(query)
try:
self.cursor.execute(query)
# 检查 'id' 列是否已经存在
self.cursor.execute("SHOW COLUMNS FROM {} LIKE 'id'".format(table_name))
result = self.cursor.fetchone()
print(result)
if result is None:
self.cursor.execute("ALTER TABLE {} ADD COLUMN id INT PRIMARY KEY AUTO_INCREMENT".format(table_name))
logging.info("create_table: 表单创建成功 [Table created successfully]")
else:
logging.info("create_table: 'id' 列已经存在 [Column 'id' already exists]")
except Exception as e:
logging.error("create_table: 表单创建失败 [Table creation failure]: %s" % e)
else:
logging.error("Creat_table: columns is null")
return False
# 判断item是否符合存储要求(私有函数)
def _validate_item_for_insert(self,item):
# 判断存储对象item是否为空
if not item:
logging.error("Insert_data: 传入存储对象为空 [item is null]")
return False
# 判断table_name是否为空
if not item.get('table_name'):
logging.error("Insert_data: 表单名为空 [table_name is null]")
return False
else :
table_name = item.get('table_name')
# 判断表单是否存在
if table_name not in self.tables_name:
logging.error("Insert_data: 表单不存在请先建立表单 [The table does not exist. Please create the table first]")
return False
table = self.tables[self.tables_name.index(table_name)]
setting_columns = table.get('columns').keys()
item_columns = item.get('columns').keys()
if setting_columns != item_columns:
logging.error("Insert_data: 待插入表单的列名与对应表单的列名不匹配 [Column names of the item do not match the table's column names]")
return False
else:
return True
'''
提取的内容格式
item={
'table_name':'urlitem',
'columns':{
'url':'url_str',
'cookie':'cookie_str',
'tag':'tag_str',
'type':'type_str',
'name':'name_str'
}
}
'''
def insert_data(self, item):
if not self._validate_item_for_insert(item):
logging.error("Insert_data: 数据不合法请查看你的item是否与对应setting格式相同 [Insert data failed]")
return False
table_name = item['table_name']
columns = item['columns']
column_names = list(columns.keys())
column_values = list(columns.values())
placeholders = ", ".join(["%s"] * len(column_names))
sql = f"INSERT INTO {table_name} ({', '.join(column_names)}) VALUES ({placeholders})"
try:
with self.conn.cursor() as cursor:
cursor.execute(sql, tuple(column_values))
self.conn.commit()
logging.info("Insert_data: 数据插入成功 [Insert data successfully]")
except Exception:
logging.exception("Insert_data: 数据插入失败 [Insert data failed]")
return False
return True
//待完善数据为进行过滤会导致sql注入漏洞
最后更新于