140 lines
4.4 KiB
Python
140 lines
4.4 KiB
Python
import re
|
|
from urllib.parse import quote
|
|
|
|
import html2text
|
|
import requests
|
|
from urlextract import URLExtract
|
|
|
|
from core.types import BaseTool
|
|
|
|
|
|
class SplashDriver:
|
|
def __init__(self, base_url: str):
|
|
self.base_url = base_url
|
|
self.headers = None
|
|
|
|
def set_options(self, headers: dict = None):
|
|
"""Set options for splash"""
|
|
|
|
headers_lua = """
|
|
local headers = {
|
|
["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0",
|
|
["Accept-Language"] = "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
|
|
["Connection"] = "keep-alive",
|
|
["Cache-Control"] = "max-age=0",
|
|
["Upgrade-Insecure-Requests"] = "1",
|
|
["Accept"] = "text/html,application/xhtml+xml,application/xml,*/*;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
|
|
{{extra_headers}}
|
|
}
|
|
"""
|
|
if headers:
|
|
headers_lua = headers_lua.replace("{{extra_headers}}",
|
|
'\n'.join([f'["{k}"] = "{v}"' for k, v in headers.items()]))
|
|
else:
|
|
headers_lua = headers_lua.replace("{{extra_headers}}", "")
|
|
self.headers = headers_lua
|
|
|
|
def _set_lua_script(self, url: str):
|
|
"""Set lua script for splash"""
|
|
if not self.headers:
|
|
self.set_options()
|
|
lua = f'''
|
|
function main(splash, args)
|
|
splash.images_enabled = false
|
|
splash.private_mode_enabled = true
|
|
splash.resource_timeout = 10.0
|
|
local url = "{url}"
|
|
{self.headers}
|
|
''' + '''
|
|
-- 发起请求
|
|
local ok, reason = splash:go({url, headers = headers })
|
|
if not ok then
|
|
return { error = reason }
|
|
end
|
|
|
|
-- 返回结果
|
|
return {
|
|
html = splash:html(),
|
|
url = splash:url(),
|
|
}
|
|
end
|
|
'''
|
|
return lua
|
|
|
|
def get(self, url: str):
|
|
"""Get url with splash"""
|
|
lua = self._set_lua_script(url)
|
|
url = f'{self.base_url}/execute?lua_source=' + quote(lua)
|
|
|
|
response = requests.get(url)
|
|
|
|
return response.json()["html"]
|
|
|
|
|
|
class WebScraper(BaseTool):
|
|
|
|
def __init__(self):
|
|
name = "web_scraper"
|
|
description = "在提到某些具体的网站地址时,可以从该网站上获取相关信息。"
|
|
super(WebScraper, self).__init__(name, description)
|
|
self.extractor = URLExtract()
|
|
self.driver = SplashDriver(base_url="http://192.168.1.100:8050")
|
|
|
|
self.execute = self.search
|
|
|
|
@staticmethod
|
|
def _parse_html(html: str) -> str:
|
|
"""解析HTML"""
|
|
html = re.sub("<style.*?>.*?</style>", "", html, flags=re.S)
|
|
html = re.sub("<script.*?>.*?</script>", "", html, flags=re.S)
|
|
html = re.sub("<textarea.*?>.*?</textarea>", "", html, flags=re.S)
|
|
html = re.sub("<link.*?/>", "", html, flags=re.S)
|
|
h = html2text.HTML2Text()
|
|
h.ignore_links = False
|
|
h.ignore_images = True
|
|
h.ignore_tables = False
|
|
h.ignore_emphasis = True
|
|
h.ignore_headers = True
|
|
h.ignore_br = True
|
|
h.body_width = 0
|
|
text = h.handle(html)
|
|
text = re.sub("\[]\(.*?\)", "", text, flags=re.S)
|
|
return text
|
|
|
|
def set_headers(self, headers: dict):
|
|
self.driver.set_options(headers)
|
|
|
|
def split_urls(self, message: str):
|
|
"""Get urls from message"""
|
|
urls = self.extractor.find_urls(message)
|
|
return list(set(urls))
|
|
|
|
def get_uri_resource(self, url):
|
|
"""Fetch and process URI resource"""
|
|
try:
|
|
html = self.driver.get(url)
|
|
protocol = url.split("://")[0]
|
|
markdown = self._parse_html(html)
|
|
base_url = url[:-1] if url.endswith("/") else url
|
|
markdown = markdown.replace("(//", f"({protocol}://")
|
|
markdown = markdown.replace("(/", f"({base_url}/")
|
|
return markdown
|
|
except Exception as e:
|
|
print(f"Error fetching {url}: {e}")
|
|
return f"获取网页信息失败\n"
|
|
|
|
def search(self, message: str):
|
|
urls = self.extractor.find_urls(message)
|
|
uri_resource = "网页信息:\n"
|
|
if urls:
|
|
for url in urls:
|
|
uri_resource += self.get_uri_resource(url)
|
|
return self.normal(uri_resource)
|
|
|
|
|
|
web_scraper = WebScraper()
|
|
if __name__ == '__main__':
|
|
question = "https://cn.bing.com/search?q=%E5%8B%92%E5%B8%83%E6%9C%97%E8%A9%B9%E5%A7%86%E6%96%AF%E6%9C%80%E8%BF%91%E7%9A%84%E6%88%98%E7%BB%A9"
|
|
result = web_scraper.search(question)
|
|
print(result["data"])
|