Files
chat-bot/function/web_tool.py
lychang 64ce30fdfd init
2025-08-26 09:35:29 +08:00

140 lines
4.4 KiB
Python

import re
from urllib.parse import quote
import html2text
import requests
from urlextract import URLExtract
from core.types import BaseTool
class SplashDriver:
def __init__(self, base_url: str):
self.base_url = base_url
self.headers = None
def set_options(self, headers: dict = None):
"""Set options for splash"""
headers_lua = """
local headers = {
["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0",
["Accept-Language"] = "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
["Connection"] = "keep-alive",
["Cache-Control"] = "max-age=0",
["Upgrade-Insecure-Requests"] = "1",
["Accept"] = "text/html,application/xhtml+xml,application/xml,*/*;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
{{extra_headers}}
}
"""
if headers:
headers_lua = headers_lua.replace("{{extra_headers}}",
'\n'.join([f'["{k}"] = "{v}"' for k, v in headers.items()]))
else:
headers_lua = headers_lua.replace("{{extra_headers}}", "")
self.headers = headers_lua
def _set_lua_script(self, url: str):
"""Set lua script for splash"""
if not self.headers:
self.set_options()
lua = f'''
function main(splash, args)
splash.images_enabled = false
splash.private_mode_enabled = true
splash.resource_timeout = 10.0
local url = "{url}"
{self.headers}
''' + '''
-- 发起请求
local ok, reason = splash:go({url, headers = headers })
if not ok then
return { error = reason }
end
-- 返回结果
return {
html = splash:html(),
url = splash:url(),
}
end
'''
return lua
def get(self, url: str):
"""Get url with splash"""
lua = self._set_lua_script(url)
url = f'{self.base_url}/execute?lua_source=' + quote(lua)
response = requests.get(url)
return response.json()["html"]
class WebScraper(BaseTool):
def __init__(self):
name = "web_scraper"
description = "在提到某些具体的网站地址时,可以从该网站上获取相关信息。"
super(WebScraper, self).__init__(name, description)
self.extractor = URLExtract()
self.driver = SplashDriver(base_url="http://192.168.1.100:8050")
self.execute = self.search
@staticmethod
def _parse_html(html: str) -> str:
"""解析HTML"""
html = re.sub("<style.*?>.*?</style>", "", html, flags=re.S)
html = re.sub("<script.*?>.*?</script>", "", html, flags=re.S)
html = re.sub("<textarea.*?>.*?</textarea>", "", html, flags=re.S)
html = re.sub("<link.*?/>", "", html, flags=re.S)
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = True
h.ignore_tables = False
h.ignore_emphasis = True
h.ignore_headers = True
h.ignore_br = True
h.body_width = 0
text = h.handle(html)
text = re.sub("\[]\(.*?\)", "", text, flags=re.S)
return text
def set_headers(self, headers: dict):
self.driver.set_options(headers)
def split_urls(self, message: str):
"""Get urls from message"""
urls = self.extractor.find_urls(message)
return list(set(urls))
def get_uri_resource(self, url):
"""Fetch and process URI resource"""
try:
html = self.driver.get(url)
protocol = url.split("://")[0]
markdown = self._parse_html(html)
base_url = url[:-1] if url.endswith("/") else url
markdown = markdown.replace("(//", f"({protocol}://")
markdown = markdown.replace("(/", f"({base_url}/")
return markdown
except Exception as e:
print(f"Error fetching {url}: {e}")
return f"获取网页信息失败\n"
def search(self, message: str):
urls = self.extractor.find_urls(message)
uri_resource = "网页信息:\n"
if urls:
for url in urls:
uri_resource += self.get_uri_resource(url)
return self.normal(uri_resource)
web_scraper = WebScraper()
if __name__ == '__main__':
question = "https://cn.bing.com/search?q=%E5%8B%92%E5%B8%83%E6%9C%97%E8%A9%B9%E5%A7%86%E6%96%AF%E6%9C%80%E8%BF%91%E7%9A%84%E6%88%98%E7%BB%A9"
result = web_scraper.search(question)
print(result["data"])