84 lines
2.8 KiB
Python
84 lines
2.8 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding:utf-8 -*-
|
|
# @Filename: weather.py
|
|
# @Author: lychang
|
|
# @Time: 7/5/2023 5:53 PM
|
|
import re
|
|
import json
|
|
import requests
|
|
from fuzzywuzzy import process
|
|
|
|
from core.types import BaseTool
|
|
from extension.standard import resource_manager
|
|
|
|
|
|
class CityMatcher:
|
|
def __init__(self):
|
|
self.data = json.loads(resource_manager.get("json", "city.json").decode('utf-8'))
|
|
self.area_map = self._build_area_map()
|
|
|
|
def _build_area_map(self):
|
|
area_map = {}
|
|
for province, cities in self.data.items():
|
|
for city, districts in cities.items():
|
|
for district, info in districts.items():
|
|
area_map[info['NAMECN']] = info['AREAID']
|
|
return area_map
|
|
|
|
def find_area_id(self, area_name, threshold=80):
|
|
match = process.extract(area_name, self.area_map.keys(), limit=5)
|
|
return [self.area_map[i[0]] for i in match if i[1] >= threshold]
|
|
|
|
|
|
matcher = CityMatcher()
|
|
|
|
|
|
class WeatherSearch(BaseTool):
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.0.0"}
|
|
|
|
def __init__(self):
|
|
name = "weather_search"
|
|
description = "在提及天气时,对天气数据进行查询"
|
|
super(WeatherSearch, self).__init__(name, description)
|
|
self.execute = self.search
|
|
|
|
@staticmethod
|
|
def _get_city_code(message: str):
|
|
area_id = matcher.find_area_id(message)
|
|
if area_id:
|
|
return area_id
|
|
else:
|
|
return ["101030100"]
|
|
|
|
@staticmethod
|
|
def _get_sub_task(message: str):
|
|
return message.split("sub_task:")[-1] if "sub_task:" in message else message
|
|
|
|
def _get_city_info(self, city_code: str):
|
|
url = f"http://www.weather.com.cn/weather/{city_code}.shtml"
|
|
resp = requests.get(url, headers=self.headers)
|
|
resp.encoding = "utf-8"
|
|
weather_html = re.findall('<ul class="t clearfix">(.*?)</ul>', resp.text, re.S)[0].replace("\n", "")
|
|
weather_info = re.findall(
|
|
'<h1>(.*?)</h1>.*?<p.*?>(.*?)</p><p class="tem">(.*?)</p><p '
|
|
'class="win">',
|
|
weather_html)
|
|
result = city_code + "\n"
|
|
for wea in weather_info:
|
|
result += " ".join(wea) + "\n"
|
|
result = result.replace("</span>", "<span>").replace("<span>", "")
|
|
result = result.replace("</i>", "<i>").replace("<i>", "")
|
|
return result
|
|
|
|
def search(self, message: str):
|
|
city_info = ""
|
|
sub_task = self._get_sub_task(message)
|
|
for city_code in self._get_city_code(sub_task):
|
|
city_info += self._get_city_info(city_code)
|
|
return self.normal(city_info)
|
|
|
|
|
|
weather_search = WeatherSearch()
|